Re: HBase reads and back pressure

2016-06-14 Thread Christophe Salperwyck
I would need to restart it to be sure (and when it starts to be stuck, the
web interface doesn't give the backpressure anymore), but it seems so. I
put a text file as the output and it took 5h to complete:
aggregates.writeAsText("hdfs:///user/christophe/flinkHBase");

What is weird is that I have as many lines as in my input if I don't limit
the scan (14B rows). If I limit the scan (150M) I have 1 line per hour as
expected. I am investigating a bit more right now.

Thanks again!
Christophe

2016-06-13 18:50 GMT+02:00 Fabian Hueske :

> Do the backpressure metrics indicate that the sink function is blocking?
>
> 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> To continue, I implemented the ws.apply(new SummaryStatistics(), new
>> YourFoldFunction(), new YourWindowFunction());
>>
>> It works fine when there is no sink, but when I put an HBase sink it
>> seems that the sink, somehow, blocks the flow. The sink writes very little
>> data into HBase and when I limit my input to just few sensors, it works
>> well. Any idea?
>>
>> final SingleOutputStreamOperator aggregates = ws
>> .apply(
>> new Aggregate(),
>> new FoldFunction() {
>>
>> @Override
>> public Aggregate fold(final Aggregate accumulator, final ANA value)
>> throws Exception {
>> accumulator.addValue(value.getValue());
>> return accumulator;
>> }
>> },
>> new WindowFunction() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable input,
>> final Collector out) throws Exception {
>> for (final Aggregate aggregate : input) {
>> aggregate.setM((String) key.getField(0));
>> aggregate.setTime(window.getStart());
>> out.collect(aggregate);
>> }
>> }
>> });
>> aggregates.
>> setParallelism(10).
>> writeUsingOutputFormat(new OutputFormat() {
>> private static final long serialVersionUID = 1L;
>> HBaseConnect hBaseConnect;
>> Table table;
>> final int flushSize = 100;
>> List puts = new ArrayList<>();
>> @Override
>> public void writeRecord(final Aggregate record) throws IOException {
>> puts.add(record.buildPut());
>> if (puts.size() == flushSize) {
>> table.put(puts);
>> }
>> }
>> @Override
>> public void open(final int taskNumber, final int numTasks) throws
>> IOException {
>> hBaseConnect = new HBaseConnect();
>> table = hBaseConnect.getHTable("PERF_TEST");
>> }
>> @Override
>> public void configure(final Configuration parameters) {
>> // TODO Auto-generated method stub
>> }
>> @Override
>> public void close() throws IOException {
>> //last inserts
>> table.put(puts);
>> table.close();
>> hBaseConnect.close();
>> }
>> });
>>
>> 2016-06-13 13:47 GMT+02:00 Maximilian Michels :
>>
>>> Thanks!
>>>
>>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>>>  wrote:
>>> > Hi,
>>> > I vote on this issue and I agree this would be nice to have.
>>> >
>>> > Thx!
>>> > Christophe
>>> >
>>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
>>> >>
>>> >> Hi,
>>> >> I'm afraid this is currently a shortcoming in the API. There is this
>>> open
>>> >> Jira issue to track it:
>>> https://issues.apache.org/jira/browse/FLINK-3869. We
>>> >> can't fix it before Flink 2.0, though, because we have to keep the API
>>> >> stable on the Flink 1.x release line.
>>> >>
>>> >> Cheers,
>>> >> Aljoscha
>>> >>
>>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>>> >>  wrote:
>>> >>>
>>> >>> Thanks for the feedback and sorry that I can't try all this straight
>>> >>> away.
>>> >>>
>>> >>> Is there a way to have a different function than:
>>> >>> WindowFunction>> TimeWindow>()
>>> >>>
>>> >>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> >>> something like this:
>>> >>> WindowFunction()
>>> >>>
>>> >>> Christophe
>>> >>>
>>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>>> 
>>>  OK, this indicates that the operator following the source is a
>>>  bottleneck.
>>> 
>>>  If that's the WindowOperator, it makes sense to try the refactoring
>>> of
>>>  the WindowFunction.
>>>  Alternatively, you can try to run that operator with a higher
>>>  parallelism.
>>> 
>>>  2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>>  :
>>> >
>>> > Hi Fabian,
>>> >
>>> > Thanks for the help, I will try that. The backpressure was on the
>>> > source (HBase).
>>> >
>>> > Christophe
>>> >
>>> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>> >>
>>> >> Hi Christophe,
>>> >>
>>> >> where does the backpressure appear? In front of the sink operator
>>> or
>>> >> before the window operator?
>>> >>
>>> >> In any case, I think you can improve your 

Re: HBase reads and back pressure

2016-06-13 Thread Fabian Hueske
Do the backpressure metrics indicate that the sink function is blocking?

2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> To continue, I implemented the ws.apply(new SummaryStatistics(), new
> YourFoldFunction(), new YourWindowFunction());
>
> It works fine when there is no sink, but when I put an HBase sink it seems
> that the sink, somehow, blocks the flow. The sink writes very little data
> into HBase and when I limit my input to just few sensors, it works well. Any
> idea?
>
> final SingleOutputStreamOperator aggregates = ws
> .apply(
> new Aggregate(),
> new FoldFunction() {
>
> @Override
> public Aggregate fold(final Aggregate accumulator, final ANA value) throws
> Exception {
> accumulator.addValue(value.getValue());
> return accumulator;
> }
> },
> new WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
> for (final Aggregate aggregate : input) {
> aggregate.setM((String) key.getField(0));
> aggregate.setTime(window.getStart());
> out.collect(aggregate);
> }
> }
> });
> aggregates.
> setParallelism(10).
> writeUsingOutputFormat(new OutputFormat() {
> private static final long serialVersionUID = 1L;
> HBaseConnect hBaseConnect;
> Table table;
> final int flushSize = 100;
> List puts = new ArrayList<>();
> @Override
> public void writeRecord(final Aggregate record) throws IOException {
> puts.add(record.buildPut());
> if (puts.size() == flushSize) {
> table.put(puts);
> }
> }
> @Override
> public void open(final int taskNumber, final int numTasks) throws
> IOException {
> hBaseConnect = new HBaseConnect();
> table = hBaseConnect.getHTable("PERF_TEST");
> }
> @Override
> public void configure(final Configuration parameters) {
> // TODO Auto-generated method stub
> }
> @Override
> public void close() throws IOException {
> //last inserts
> table.put(puts);
> table.close();
> hBaseConnect.close();
> }
> });
>
> 2016-06-13 13:47 GMT+02:00 Maximilian Michels :
>
>> Thanks!
>>
>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>>  wrote:
>> > Hi,
>> > I vote on this issue and I agree this would be nice to have.
>> >
>> > Thx!
>> > Christophe
>> >
>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
>> >>
>> >> Hi,
>> >> I'm afraid this is currently a shortcoming in the API. There is this
>> open
>> >> Jira issue to track it:
>> https://issues.apache.org/jira/browse/FLINK-3869. We
>> >> can't fix it before Flink 2.0, though, because we have to keep the API
>> >> stable on the Flink 1.x release line.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> >>  wrote:
>> >>>
>> >>> Thanks for the feedback and sorry that I can't try all this straight
>> >>> away.
>> >>>
>> >>> Is there a way to have a different function than:
>> >>> WindowFunction> TimeWindow>()
>> >>>
>> >>> I would like to return a HBase Put and not a SummaryStatistics. So
>> >>> something like this:
>> >>> WindowFunction()
>> >>>
>> >>> Christophe
>> >>>
>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>> 
>>  OK, this indicates that the operator following the source is a
>>  bottleneck.
>> 
>>  If that's the WindowOperator, it makes sense to try the refactoring
>> of
>>  the WindowFunction.
>>  Alternatively, you can try to run that operator with a higher
>>  parallelism.
>> 
>>  2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>  :
>> >
>> > Hi Fabian,
>> >
>> > Thanks for the help, I will try that. The backpressure was on the
>> > source (HBase).
>> >
>> > Christophe
>> >
>> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>> >>
>> >> Hi Christophe,
>> >>
>> >> where does the backpressure appear? In front of the sink operator
>> or
>> >> before the window operator?
>> >>
>> >> In any case, I think you can improve your WindowFunction if you
>> >> convert parts of it into a FoldFunction.
>> >> The FoldFunction would take care of the statistics computation and
>> the
>> >> WindowFunction would only assemble the result record including
>> extracting
>> >> the start time of the window.
>> >>
>> >> Then you could do:
>> >>
>> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> >> YourWindowFunction());
>> >>
>> >> This is more efficient because the FoldFunction is eagerly applied
>> >> when ever a new element is added to a window. Hence, the window
>> does only
>> >> hold a single value (SummaryStatistics) instead of all element
>> added 

Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
To continue, I implemented the ws.apply(new SummaryStatistics(), new
YourFoldFunction(), new YourWindowFunction());

It works fine when there is no sink, but when I put an HBase sink it seems
that the sink, somehow, blocks the flow. The sink writes very little data
into HBase and when I limit my input to just few sensors, it works well. Any
idea?

final SingleOutputStreamOperator aggregates = ws
.apply(
new Aggregate(),
new FoldFunction() {

@Override
public Aggregate fold(final Aggregate accumulator, final ANA value) throws
Exception {
accumulator.addValue(value.getValue());
return accumulator;
}
},
new WindowFunction() {

@Override
public void apply(final Tuple key, final TimeWindow window, final
Iterable input,
final Collector out) throws Exception {
for (final Aggregate aggregate : input) {
aggregate.setM((String) key.getField(0));
aggregate.setTime(window.getStart());
out.collect(aggregate);
}
}
});
aggregates.
setParallelism(10).
writeUsingOutputFormat(new OutputFormat() {
private static final long serialVersionUID = 1L;
HBaseConnect hBaseConnect;
Table table;
final int flushSize = 100;
List puts = new ArrayList<>();
@Override
public void writeRecord(final Aggregate record) throws IOException {
puts.add(record.buildPut());
if (puts.size() == flushSize) {
table.put(puts);
}
}
@Override
public void open(final int taskNumber, final int numTasks) throws
IOException {
hBaseConnect = new HBaseConnect();
table = hBaseConnect.getHTable("PERF_TEST");
}
@Override
public void configure(final Configuration parameters) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
//last inserts
table.put(puts);
table.close();
hBaseConnect.close();
}
});

2016-06-13 13:47 GMT+02:00 Maximilian Michels :

> Thanks!
>
> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>  wrote:
> > Hi,
> > I vote on this issue and I agree this would be nice to have.
> >
> > Thx!
> > Christophe
> >
> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
> >>
> >> Hi,
> >> I'm afraid this is currently a shortcoming in the API. There is this
> open
> >> Jira issue to track it:
> https://issues.apache.org/jira/browse/FLINK-3869. We
> >> can't fix it before Flink 2.0, though, because we have to keep the API
> >> stable on the Flink 1.x release line.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
> >>  wrote:
> >>>
> >>> Thanks for the feedback and sorry that I can't try all this straight
> >>> away.
> >>>
> >>> Is there a way to have a different function than:
> >>> WindowFunction TimeWindow>()
> >>>
> >>> I would like to return a HBase Put and not a SummaryStatistics. So
> >>> something like this:
> >>> WindowFunction()
> >>>
> >>> Christophe
> >>>
> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
> 
>  OK, this indicates that the operator following the source is a
>  bottleneck.
> 
>  If that's the WindowOperator, it makes sense to try the refactoring of
>  the WindowFunction.
>  Alternatively, you can try to run that operator with a higher
>  parallelism.
> 
>  2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>  :
> >
> > Hi Fabian,
> >
> > Thanks for the help, I will try that. The backpressure was on the
> > source (HBase).
> >
> > Christophe
> >
> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
> >>
> >> Hi Christophe,
> >>
> >> where does the backpressure appear? In front of the sink operator or
> >> before the window operator?
> >>
> >> In any case, I think you can improve your WindowFunction if you
> >> convert parts of it into a FoldFunction.
> >> The FoldFunction would take care of the statistics computation and
> the
> >> WindowFunction would only assemble the result record including
> extracting
> >> the start time of the window.
> >>
> >> Then you could do:
> >>
> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> >> YourWindowFunction());
> >>
> >> This is more efficient because the FoldFunction is eagerly applied
> >> when ever a new element is added to a window. Hence, the window
> does only
> >> hold a single value (SummaryStatistics) instead of all element
> added to the
> >> window. In contrast the WindowFunction is called when the window is
> finally
> >> evaluated.
> >>
> >> Hope this helps,
> >> Fabian
> >>
> >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
> >> :
> >>>
> >>> Hi,
> >>>
> >>> I am writing a program to read timeseries from HBase and do 

Re: HBase reads and back pressure

2016-06-13 Thread Maximilian Michels
Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
 wrote:
> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>>  wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :

 OK, this indicates that the operator following the source is a
 bottleneck.

 If that's the WindowOperator, it makes sense to try the refactoring of
 the WindowFunction.
 Alternatively, you can try to run that operator with a higher
 parallelism.

 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
 :
>
> Hi Fabian,
>
> Thanks for the help, I will try that. The backpressure was on the
> source (HBase).
>
> Christophe
>
> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>
>> Hi Christophe,
>>
>> where does the backpressure appear? In front of the sink operator or
>> before the window operator?
>>
>> In any case, I think you can improve your WindowFunction if you
>> convert parts of it into a FoldFunction.
>> The FoldFunction would take care of the statistics computation and the
>> WindowFunction would only assemble the result record including extracting
>> the start time of the window.
>>
>> Then you could do:
>>
>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> YourWindowFunction());
>>
>> This is more efficient because the FoldFunction is eagerly applied
>> when ever a new element is added to a window. Hence, the window does only
>> hold a single value (SummaryStatistics) instead of all element added to 
>> the
>> window. In contrast the WindowFunction is called when the window is 
>> finally
>> evaluated.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>> :
>>>
>>> Hi,
>>>
>>> I am writing a program to read timeseries from HBase and do some
>>> daily aggregations (Flink streaming). For now I am just computing some
>>> average so not very consuming but my HBase read get slower and slower (I
>>> have few billions of points to read). The back pressure is almost all 
>>> the
>>> time close to 1.
>>>
>>> I use custom timestamp:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> so I implemented a custom extractor based on:
>>> AscendingTimestampExtractor
>>>
>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>> still being written in HBase (I did a sink similar to the example - 
>>> with a
>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>
>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>
>>> Do you have an idea why ?
>>>
>>> Here is a bit of code if needed:
>>> final WindowedStream ws = hbaseDS.keyBy(0)
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>>> .keyBy(0)
>>> .timeWindow(Time.days(1));
>>>
>>> final SingleOutputStreamOperator puts = ws.apply(new
>>> WindowFunction() {
>>>
>>> @Override
>>> public void apply(final Tuple key, final TimeWindow window, final
>>> Iterable input,
>>> final Collector out) throws Exception {
>>>
>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>> for (final ANA ana : input) {
>>> summaryStatistics.addValue(ana.getValue());
>>> }
>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>> summaryStatistics);
>>> out.collect(put);
>>> }
>>> });
>>>
>>> And how I started Flink on YARN :
>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>
>>> Thanks for 

Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
Hi,
I vote on this issue and I agree this would be nice to have.

Thx!
Christophe

2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :

> Hi,
> I'm afraid this is currently a shortcoming in the API. There is this open
> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869.
> We can't fix it before Flink 2.0, though, because we have to keep the API
> stable on the Flink 1.x release line.
>
> Cheers,
> Aljoscha
>
> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <
> christophe.salperw...@gmail.com> wrote:
>
>> Thanks for the feedback and sorry that I can't try all this straight away.
>>
>> Is there a way to have a different function than:
>> WindowFunction()
>>
>> I would like to return a HBase Put and not a SummaryStatistics. So
>> something like this:
>> WindowFunction()
>>
>> Christophe
>>
>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>>
>>> OK, this indicates that the operator following the source is a
>>> bottleneck.
>>>
>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>> the WindowFunction.
>>> Alternatively, you can try to run that operator with a higher
>>> parallelism.
>>>
>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
>>> christophe.salperw...@gmail.com>:
>>>
 Hi Fabian,

 Thanks for the help, I will try that. The backpressure was on the
 source (HBase).

 Christophe

 2016-06-09 16:38 GMT+02:00 Fabian Hueske :

> Hi Christophe,
>
> where does the backpressure appear? In front of the sink operator or
> before the window operator?
>
> In any case, I think you can improve your WindowFunction if you
> convert parts of it into a FoldFunction.
> The FoldFunction would take care of the statistics computation and the
> WindowFunction would only assemble the result record including extracting
> the start time of the window.
>
> Then you could do:
>
> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> YourWindowFunction());
>
> This is more efficient because the FoldFunction is eagerly applied
> when ever a new element is added to a window. Hence, the window does only
> hold a single value (SummaryStatistics) instead of all element added to 
> the
> window. In contrast the WindowFunction is called when the window is 
> finally
> evaluated.
>
> Hope this helps,
> Fabian
>
> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi,
>>
>> I am writing a program to read timeseries from HBase and do some
>> daily aggregations (Flink streaming). For now I am just computing some
>> average so not very consuming but my HBase read get slower and slower (I
>> have few billions of points to read). The back pressure is almost all the
>> time close to 1.
>>
>> I use custom timestamp:
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> so I implemented a custom extractor based on:
>> AscendingTimestampExtractor
>>
>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>> read/s then it get worse and worse. Even when I cancel the job, data are
>> still being written in HBase (I did a sink similar to the example - with 
>> a
>> cache of 100s of HBase Puts to be a bit more efficient).
>>
>> When I don't put a sink it seems to stay on 1M reads/s.
>>
>> Do you have an idea why ?
>>
>> Here is a bit of code if needed:
>> final WindowedStream ws = hbaseDS.keyBy(0)
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>> .keyBy(0)
>> .timeWindow(Time.days(1));
>>
>> final SingleOutputStreamOperator puts = ws.apply(new
>> WindowFunction() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable input,
>> final Collector out) throws Exception {
>>
>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>> for (final ANA ana : input) {
>> summaryStatistics.addValue(ana.getValue());
>> }
>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>> summaryStatistics);
>> out.collect(put);
>> }
>> });
>>
>> And how I started Flink on YARN :
>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>> -Dtaskmanager.network.numberOfBuffers=4096
>>
>> Thanks for any feedback!
>>
>> Christophe
>>
>
>

>>>
>>


Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
Hi Max,

In fact the Put would be the output of my WindowFunction. I saw Aljoscha
replied, seems I will need to create another intermediate class to handle
that. But it is fine.

Thx for help!
Christophe

2016-06-13 12:25 GMT+02:00 Maximilian Michels :

> Hi Christophe,
>
> A fold function has two inputs: The state and a record to update the
> state with. So you can update the SummaryStatistics (state) with each
> Put (input).
>
> Cheers,
> Max
>
> On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
>  wrote:
> > Thanks for the feedback and sorry that I can't try all this straight
> away.
> >
> > Is there a way to have a different function than:
> > WindowFunction()
> >
> > I would like to return a HBase Put and not a SummaryStatistics. So
> something
> > like this:
> > WindowFunction()
> >
> > Christophe
> >
> > 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
> >>
> >> OK, this indicates that the operator following the source is a
> bottleneck.
> >>
> >> If that's the WindowOperator, it makes sense to try the refactoring of
> the
> >> WindowFunction.
> >> Alternatively, you can try to run that operator with a higher
> parallelism.
> >>
> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
> >> :
> >>>
> >>> Hi Fabian,
> >>>
> >>> Thanks for the help, I will try that. The backpressure was on the
> source
> >>> (HBase).
> >>>
> >>> Christophe
> >>>
> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
> 
>  Hi Christophe,
> 
>  where does the backpressure appear? In front of the sink operator or
>  before the window operator?
> 
>  In any case, I think you can improve your WindowFunction if you
> convert
>  parts of it into a FoldFunction.
>  The FoldFunction would take care of the statistics computation and the
>  WindowFunction would only assemble the result record including
> extracting
>  the start time of the window.
> 
>  Then you could do:
> 
>  ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>  YourWindowFunction());
> 
>  This is more efficient because the FoldFunction is eagerly applied
> when
>  ever a new element is added to a window. Hence, the window does only
> hold a
>  single value (SummaryStatistics) instead of all element added to the
> window.
>  In contrast the WindowFunction is called when the window is finally
>  evaluated.
> 
>  Hope this helps,
>  Fabian
> 
>  2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>  :
> >
> > Hi,
> >
> > I am writing a program to read timeseries from HBase and do some
> daily
> > aggregations (Flink streaming). For now I am just computing some
> average so
> > not very consuming but my HBase read get slower and slower (I have
> few
> > billions of points to read). The back pressure is almost all the
> time close
> > to 1.
> >
> > I use custom timestamp:
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > so I implemented a custom extractor based on:
> > AscendingTimestampExtractor
> >
> > At the beginning I have 5M reads/s and after 15 min I have just 1M
> > read/s then it get worse and worse. Even when I cancel the job, data
> are
> > still being written in HBase (I did a sink similar to the example -
> with a
> > cache of 100s of HBase Puts to be a bit more efficient).
> >
> > When I don't put a sink it seems to stay on 1M reads/s.
> >
> > Do you have an idea why ?
> >
> > Here is a bit of code if needed:
> > final WindowedStream ws = hbaseDS.keyBy(0)
> > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> > .keyBy(0)
> > .timeWindow(Time.days(1));
> >
> > final SingleOutputStreamOperator puts = ws.apply(new
> > WindowFunction() {
> >
> > @Override
> > public void apply(final Tuple key, final TimeWindow window, final
> > Iterable input,
> > final Collector out) throws Exception {
> >
> > final SummaryStatistics summaryStatistics = new SummaryStatistics();
> > for (final ANA ana : input) {
> > summaryStatistics.addValue(ana.getValue());
> > }
> > final Put put = buildPut((String) key.getField(0), window.getStart(),
> > summaryStatistics);
> > out.collect(put);
> > }
> > });
> >
> > And how I started Flink on YARN :
> > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> > -Dtaskmanager.network.numberOfBuffers=4096
> >
> > Thanks for any feedback!
> >
> > Christophe
> 
> 
> >>>
> >>
> >
>


Re: HBase reads and back pressure

2016-06-13 Thread Aljoscha Krettek
Hi,
I'm afraid this is currently a shortcoming in the API. There is this open
Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869.
We can't fix it before Flink 2.0, though, because we have to keep the API
stable on the Flink 1.x release line.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <
christophe.salperw...@gmail.com> wrote:

> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction()
>
> I would like to return a HBase Put and not a SummaryStatistics. So
> something like this:
> WindowFunction()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>
>> OK, this indicates that the operator following the source is a
>> bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of
>> the WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
>> christophe.salperw...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>>
 Hi Christophe,

 where does the backpressure appear? In front of the sink operator or
 before the window operator?

 In any case, I think you can improve your WindowFunction if you convert
 parts of it into a FoldFunction.
 The FoldFunction would take care of the statistics computation and the
 WindowFunction would only assemble the result record including extracting
 the start time of the window.

 Then you could do:

 ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
 YourWindowFunction());

 This is more efficient because the FoldFunction is eagerly applied when
 ever a new element is added to a window. Hence, the window does only hold a
 single value (SummaryStatistics) instead of all element added to the
 window. In contrast the WindowFunction is called when the window is finally
 evaluated.

 Hope this helps,
 Fabian

 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
 christophe.salperw...@gmail.com>:

> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average 
> so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time 
> close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M
> read/s then it get worse and worse. Even when I cancel the job, data are
> still being written in HBase (I did a sink similar to the example - with a
> cache of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe
>


>>>
>>
>


Re: HBase reads and back pressure

2016-06-13 Thread Maximilian Michels
Hi Christophe,

A fold function has two inputs: The state and a record to update the
state with. So you can update the SummaryStatistics (state) with each
Put (input).

Cheers,
Max

On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
 wrote:
> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction()
>
> I would like to return a HBase Put and not a SummaryStatistics. So something
> like this:
> WindowFunction()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>>
>> OK, this indicates that the operator following the source is a bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of the
>> WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> :
>>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :

 Hi Christophe,

 where does the backpressure appear? In front of the sink operator or
 before the window operator?

 In any case, I think you can improve your WindowFunction if you convert
 parts of it into a FoldFunction.
 The FoldFunction would take care of the statistics computation and the
 WindowFunction would only assemble the result record including extracting
 the start time of the window.

 Then you could do:

 ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
 YourWindowFunction());

 This is more efficient because the FoldFunction is eagerly applied when
 ever a new element is added to a window. Hence, the window does only hold a
 single value (SummaryStatistics) instead of all element added to the 
 window.
 In contrast the WindowFunction is called when the window is finally
 evaluated.

 Hope this helps,
 Fabian

 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
 :
>
> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average 
> so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time 
> close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M
> read/s then it get worse and worse. Even when I cancel the job, data are
> still being written in HBase (I did a sink similar to the example - with a
> cache of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe


>>>
>>
>


Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
Thanks for the feedback and sorry that I can't try all this straight away.

Is there a way to have a different function than:
WindowFunction()

I would like to return a HBase Put and not a SummaryStatistics. So
something like this:
WindowFunction()

Christophe

2016-06-09 17:47 GMT+02:00 Fabian Hueske :

> OK, this indicates that the operator following the source is a bottleneck.
>
> If that's the WindowOperator, it makes sense to try the refactoring of the
> WindowFunction.
> Alternatively, you can try to run that operator with a higher parallelism.
>
> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the help, I will try that. The backpressure was on the source
>> (HBase).
>>
>> Christophe
>>
>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>
>>> Hi Christophe,
>>>
>>> where does the backpressure appear? In front of the sink operator or
>>> before the window operator?
>>>
>>> In any case, I think you can improve your WindowFunction if you convert
>>> parts of it into a FoldFunction.
>>> The FoldFunction would take care of the statistics computation and the
>>> WindowFunction would only assemble the result record including extracting
>>> the start time of the window.
>>>
>>> Then you could do:
>>>
>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>> YourWindowFunction());
>>>
>>> This is more efficient because the FoldFunction is eagerly applied when
>>> ever a new element is added to a window. Hence, the window does only hold a
>>> single value (SummaryStatistics) instead of all element added to the
>>> window. In contrast the WindowFunction is called when the window is finally
>>> evaluated.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>>> christophe.salperw...@gmail.com>:
>>>
 Hi,

 I am writing a program to read timeseries from HBase and do some daily
 aggregations (Flink streaming). For now I am just computing some average so
 not very consuming but my HBase read get slower and slower (I have few
 billions of points to read). The back pressure is almost all the time close
 to 1.

 I use custom timestamp:
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 so I implemented a custom extractor based on:
 AscendingTimestampExtractor

 At the beginning I have 5M reads/s and after 15 min I have just 1M
 read/s then it get worse and worse. Even when I cancel the job, data are
 still being written in HBase (I did a sink similar to the example - with a
 cache of 100s of HBase Puts to be a bit more efficient).

 When I don't put a sink it seems to stay on 1M reads/s.

 Do you have an idea why ?

 Here is a bit of code if needed:
 final WindowedStream ws = hbaseDS.keyBy(0)
 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
 .keyBy(0)
 .timeWindow(Time.days(1));

 final SingleOutputStreamOperator puts = ws.apply(new
 WindowFunction() {

 @Override
 public void apply(final Tuple key, final TimeWindow window, final
 Iterable input,
 final Collector out) throws Exception {

 final SummaryStatistics summaryStatistics = new SummaryStatistics();
 for (final ANA ana : input) {
 summaryStatistics.addValue(ana.getValue());
 }
 final Put put = buildPut((String) key.getField(0), window.getStart(),
 summaryStatistics);
 out.collect(put);
 }
 });

 And how I started Flink on YARN :
 flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
 -Dtaskmanager.network.numberOfBuffers=4096

 Thanks for any feedback!

 Christophe

>>>
>>>
>>
>


Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the
WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi Fabian,
>
> Thanks for the help, I will try that. The backpressure was on the source
> (HBase).
>
> Christophe
>
> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>
>> Hi Christophe,
>>
>> where does the backpressure appear? In front of the sink operator or
>> before the window operator?
>>
>> In any case, I think you can improve your WindowFunction if you convert
>> parts of it into a FoldFunction.
>> The FoldFunction would take care of the statistics computation and the
>> WindowFunction would only assemble the result record including extracting
>> the start time of the window.
>>
>> Then you could do:
>>
>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> YourWindowFunction());
>>
>> This is more efficient because the FoldFunction is eagerly applied when
>> ever a new element is added to a window. Hence, the window does only hold a
>> single value (SummaryStatistics) instead of all element added to the
>> window. In contrast the WindowFunction is called when the window is finally
>> evaluated.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>> christophe.salperw...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am writing a program to read timeseries from HBase and do some daily
>>> aggregations (Flink streaming). For now I am just computing some average so
>>> not very consuming but my HBase read get slower and slower (I have few
>>> billions of points to read). The back pressure is almost all the time close
>>> to 1.
>>>
>>> I use custom timestamp:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> so I implemented a custom extractor based on:
>>> AscendingTimestampExtractor
>>>
>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>> still being written in HBase (I did a sink similar to the example - with a
>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>
>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>
>>> Do you have an idea why ?
>>>
>>> Here is a bit of code if needed:
>>> final WindowedStream ws = hbaseDS.keyBy(0)
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>>> .keyBy(0)
>>> .timeWindow(Time.days(1));
>>>
>>> final SingleOutputStreamOperator puts = ws.apply(new
>>> WindowFunction() {
>>>
>>> @Override
>>> public void apply(final Tuple key, final TimeWindow window, final
>>> Iterable input,
>>> final Collector out) throws Exception {
>>>
>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>> for (final ANA ana : input) {
>>> summaryStatistics.addValue(ana.getValue());
>>> }
>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>> summaryStatistics);
>>> out.collect(put);
>>> }
>>> });
>>>
>>> And how I started Flink on YARN :
>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>
>>> Thanks for any feedback!
>>>
>>> Christophe
>>>
>>
>>
>


Re: HBase reads and back pressure

2016-06-09 Thread Christophe Salperwyck
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source
(HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske :

> Hi Christophe,
>
> where does the backpressure appear? In front of the sink operator or
> before the window operator?
>
> In any case, I think you can improve your WindowFunction if you convert
> parts of it into a FoldFunction.
> The FoldFunction would take care of the statistics computation and the
> WindowFunction would only assemble the result record including extracting
> the start time of the window.
>
> Then you could do:
>
> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> YourWindowFunction());
>
> This is more efficient because the FoldFunction is eagerly applied when
> ever a new element is added to a window. Hence, the window does only hold a
> single value (SummaryStatistics) instead of all element added to the
> window. In contrast the WindowFunction is called when the window is finally
> evaluated.
>
> Hope this helps,
> Fabian
>
> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi,
>>
>> I am writing a program to read timeseries from HBase and do some daily
>> aggregations (Flink streaming). For now I am just computing some average so
>> not very consuming but my HBase read get slower and slower (I have few
>> billions of points to read). The back pressure is almost all the time close
>> to 1.
>>
>> I use custom timestamp:
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> so I implemented a custom extractor based on:
>> AscendingTimestampExtractor
>>
>> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
>> then it get worse and worse. Even when I cancel the job, data are still
>> being written in HBase (I did a sink similar to the example - with a cache
>> of 100s of HBase Puts to be a bit more efficient).
>>
>> When I don't put a sink it seems to stay on 1M reads/s.
>>
>> Do you have an idea why ?
>>
>> Here is a bit of code if needed:
>> final WindowedStream ws = hbaseDS.keyBy(0)
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>> .keyBy(0)
>> .timeWindow(Time.days(1));
>>
>> final SingleOutputStreamOperator puts = ws.apply(new
>> WindowFunction() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable input,
>> final Collector out) throws Exception {
>>
>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>> for (final ANA ana : input) {
>> summaryStatistics.addValue(ana.getValue());
>> }
>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>> summaryStatistics);
>> out.collect(put);
>> }
>> });
>>
>> And how I started Flink on YARN :
>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>> -Dtaskmanager.network.numberOfBuffers=4096
>>
>> Thanks for any feedback!
>>
>> Christophe
>>
>
>


Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before
the window operator?

In any case, I think you can improve your WindowFunction if you convert
parts of it into a FoldFunction.
The FoldFunction would take care of the statistics computation and the
WindowFunction would only assemble the result record including extracting
the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when
ever a new element is added to a window. Hence, the window does only hold a
single value (SummaryStatistics) instead of all element added to the
window. In contrast the WindowFunction is called when the window is finally
evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
> then it get worse and worse. Even when I cancel the job, data are still
> being written in HBase (I did a sink similar to the example - with a cache
> of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe
>