RollingSink - question on a failure scenario

2016-06-29 Thread vpra...@gmail.com
Hi,

Is there a chance of data loss if there is a failure between the checkpoint
completion and when "notifyCheckpointComplete" is invoked.

The pending files are moved to final state in the "notifyCheckpointComplete"
method. So if there is a failure in this method or just before the method is
invoked the data in the pending files is lost, am I missing something ?

Thanks,
Prabhu 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RollingSink-question-on-a-failure-scenario-tp7735.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Parameters to Control Intra-node Parallelism

2016-06-29 Thread Saliya Ekanayake
Hi,

We are trying to scale some of our scientific applications written in
Flink. A few questions on tuning Flink performance.

1. What parameters are available to control parallelism within a node?
2. Does Flink support shared memory-based messaging within a node (without
doing TCP calls)?
3. Is there support for Infiniband interconnect?

Thank you,
Saliya

-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Checkpointing very large state in RocksDB?

2016-06-29 Thread Daniel Li
When RocksDB holds a very large state, is there a concern over the time
takes in checkpointing the RocksDB data to HDFS? Is asynchronous
checkpointing a recommended practice here?


https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html

"The RocksDBStateBackend holds in-flight data in a RocksDB
 data base that is (per default) stored in the
TaskManager data directories. Upon checkpointing, the whole RocksDB data
base will be checkpointed into the configured file system and directory.
Minimal metadata is stored in the JobManager’s memory (or, in
high-availability mode, in the metadata checkpoint).

The RocksDBStateBackend is encouraged for:

   - Jobs with very large state, long windows, large key/value states.
   - All high-availability setups."


thx
Daniel


Switch to skip the stream alignment during a checkpoint?

2016-06-29 Thread Daniel Li
I am reading Stream Checkpointing doc below. But somehow couldn't find that
"switch" in any other Apache Flink docs. Has anyone of you tried this
switch?


https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

"Flink has a switch to skip the stream alignment during a checkpoint.
Checkpoint snapshots are still drawn as soon as an operator has seen the
checkpoint barrier from each input."


thx
Daniel


Re: Question regarding logging capabilities in flink

2016-06-29 Thread Sharma, Samiksha
Just wanted to followup and know if there are any updates regarding this
query?

Thanks

Samiksha Sharma
Engineer
HERE Predictive Analytics

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps






On 6/24/16, 12:14 PM, "Sharma, Samiksha"  wrote:

>Hi,
>
>I think what I wanted to know was if I add some logging in my flink job, I
>do not see them on Yarn.
>
>For example, In the following snippet in WordCount example when I add
>Logging, I do not see this in yarn logs when I run the job. However, I see
>them as client log on my side but not on yarn. So, is there a way I can
>see these on yarn as well along with jobmanager and taskmanager logs.
>
>
>
>public class WordCount {
>
>public static void main(String[] args) throws Exception {
>
>Logger LOGGER = LoggerFactory.getLogger(WordCount.class);
>LOGGER.info("This message is testing for Logging in this job");
>LOGGER.warn("This is just TESTING WARN");
>
>
>
>// Checking input parameters
>final ParameterTool params = ParameterTool.fromArgs(args);
>System.out.println("Usage: WordCount --input  --output ");
>
>
>
>
>Please let me know if you have any clarification questions.
>
>Thanks
>Samiksha Sharma
>
>
>
>
>
>
>
>On 6/24/16, 4:53 AM, "Maximilian Michels"  wrote:
>
>>Hi,
>>
>>Flink prints the Yarn application id during deployment of the cluster.
>>You can then query the logs from Yarn using the `yarn logs
>>-applicationId ` command.
>>
>>Please have a look at
>>https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YarnCo
>>m
>>mands.html#logs
>>
>>Cheers,
>>Max
>>
>>On Thu, Jun 23, 2016 at 7:44 PM, Sharma, Samiksha
>> wrote:
>>> Hi,
>>>
>>> I was reading this link regarding logging in flink jobs
>>> 
>>>(https://ci.apache.org/projects/flink/flink-docs-master/internals/loggin
>>>g
>>>.html)
>>> and with modifications to log4j files I am able to see logs in
>>>flink/log
>>> directory when I run job Standalone or on Yarn, but I was more
>>>interested in
>>> seeing the logs I added in my job when I see logs on Yarn UI.
>>> Is there a way in Flink that can enable the user to see the logs added
>>>in a
>>> job on Yarn rather than on client side.?
>>>
>>> Thanks
>>> Samiksha
>>>
>>>
>



Flink on YARN - how to resize a running cluster?

2016-06-29 Thread Josh
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a
stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra
task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to
YARN is easy to do via the AWS console. But I haven't been able to find any
information in Flink docs about how to resize a running Flink cluster on
YARN. Is it possible to resize it while the YARN application is running, or
do I need to stop the YARN application and redeploy the cluster? Also do I
need to redeploy my Flink job from a savepoint to increase its parallelism,
or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!)
and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh


Re: Optimizations not performed - please confirm

2016-06-29 Thread Fabian Hueske
Yes, that was my fault. I'm used to auto reply-all on my desktop machine,
but my phone just did a simple reply.
Sorry for the confusion,
Fabian



2016-06-29 19:24 GMT+02:00 Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr>:

> Thank you, Aljoscha!
> I received a similar update from Fabian, only now I see the user list was
> not in CC.
>
> Fabian::*The optimizer hasn’t been touched (except for bugfixes and new
> operators) for quite some time.*
> *These limitations are still present and I don’t expect them to be removed
> anytime soon. IMO, it is more likely that certain optimizations like join
> reordering will be done for Table API / SQL queries by the Calcite
> optimizer and pushed through the Flink Dataset optimizer.*
>
> I agree, for join reordering optimisations it makes sense to rely on
> Calcite.
> My goal is to understand how current documentation correlates to the
> Flink’s framework status.
>
> I've did an experimental study where I compared Flink and Spark for many
> workloads at very large scale (I’ll share the results soon) and I would
> like to develop a few ideas on top of Flink (from the results Flink is the
> winner in most of the use cases and it is our choice for the platform on
> which to develop and grow).
>
> My interest is in understanding more about Flink today. I am familiar with
> most of the papers written, I am watching the documentation also.
> I am looking at the DataSet API, runtime and current architecture.
>
> Best,
> Ovidiu
>
> On 29 Jun 2016, at 17:27, Aljoscha Krettek  wrote:
>
> Hi,
> I think this document is still up-to-date since not much was done in these
> parts of the code for the 1.0 release and after that.
>
> Maybe Timo can give some insights into what optimizations are done in the
> Table API/SQL that will be be released in an updated version in 1.1.
>
> Cheers,
> Aljoscha
>
> +Timo, Explicitly adding Timo
>
> On Tue, 28 Jun 2016 at 21:41 Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Hi,
>>
>> The optimizer internals described in this document [1] are probably not
>> up-to-date.
>> Can you please confirm if this is still valid:
>>
>> *“The following optimizations are not performed*
>>
>>- *Join reordering (or operator reordering in general): Joins /
>>Filters / Reducers are not re-ordered in Flink. This is a high opportunity
>>optimization, but with high risk in the absence of good estimates about 
>> the
>>data characteristics. Flink is not doing these optimizations at this 
>> point.*
>>- *Index vs. Table Scan selection: In Flink, all data sources are
>>always scanned. The data source (the input format) may apply clever
>>mechanism to not scan all the data, but pre-select and project. Examples
>>are the RCFile / ORCFile / Parquet input formats."*
>>
>> Any update of this page will be very helpful.
>>
>> Thank you.
>>
>> Best,
>> Ovidiu
>> [1] https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals
>>
>
>


Re: Documentation for translation of Job graph to Execution graph

2016-06-29 Thread Bajaj, Abhinav
Hi Robert,

Thanks for helpful reply.
I have couple of follow up questions on your reply - "In general, we recommend 
running one JobManager per job”
I understand how this can be achieved while running in Yarn, I.e. by submitting 
single Flink Jobs.

Is their some other way of setting Flink to configure single Jobmanager per job 
?

Is their a plan to add the Job id or name to the logs ?

Thanks,
Abhi

From: Robert Metzger mailto:rmetz...@apache.org>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Tuesday, June 21, 2016 at 8:23 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>, Till Rohrmann 
mailto:trohrm...@apache.org>>
Cc: Aljoscha Krettek mailto:aljos...@apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
the link has been added newly, yes.

Regarding Q1, since there is no documentation right now, I have to refer you to 
our code. In the JobManager.scala class, there is a method "private def 
submitJob(jobGraph, ...") where the ExecutionGraph is created. I think that's a 
good starting point for looking through the code. (I also added Till to the 
message if he wants to chime in)

Q2: Currently, Flink doesn't add the job name to the logs, so its indeed not 
very easy to separate the log entries generated by different jobs. In general, 
we recommend running one JobManager per job (multiple jobs is of course also 
supported).




On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

Thanks for sharing this link. I have not see it before. May be this is newly 
added in 1.0 docs. I will go through it.

In general, there are two things I am trying to understand and get comfortable 
with -

  1.  How a Job graph is translated to Execution graph. The logs and monitoring 
APIs are for the Execution graph. So, I need to map them to the Job graph. I am 
trying to bridge this gap.
  2.  The job manager & task manager logs are tricky to decipher. Especially 
when there are multiple jobs running. Is there a way to filter the logs for a 
single job ?

~ Abhi


From: Aljoscha Krettek mailto:aljos...@apache.org>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, June 17, 2016 at 2:31 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
I'm afraid there is no documentation besides the link that you posted and this 
one: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html.

With what parts are you having trouble? Maybe I can help.

Cheers,
Aljoscha

On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

When troubleshooting a flink job, it is tricky to map the Job graph 
(application code) to the logs & monitoring REST APIs.

So, I am trying to find documentation on how a Job graph is translated to 
Execution graph.
I found this - 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html

Any detailed documentation on the design and code components will be helpful.

Thanks,
Abhi



How to avoid breaking states when upgrading Flink job?

2016-06-29 Thread Josh
Hi all,
Is there any information out there on how to avoid breaking saved
states/savepoints when making changes to a Flink job and redeploying it?

I want to know how to avoid exceptions like this:

java.lang.RuntimeException: Failed to deserialize state handle and
setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.me.flink.MyJob$$anon$1$$anon$7$$anon$4


The best information I could find in the docs is here:

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html


Having made the suggested changes to my job (i.e. giving a uid to
every stateful sink and map function), what changes to the
job/topology are then allowed/not allowed?


If I'm 'naming' my states by providing uids, why does Flink need to
look for a specific class, like
com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?


Thanks for any advice,

Josh


Re: Optimizations not performed - please confirm

2016-06-29 Thread Ovidiu-Cristian MARCU
Thank you, Aljoscha!
I received a similar update from Fabian, only now I see the user list was not 
in CC.

Fabian::The optimizer hasn’t been touched (except for bugfixes and new 
operators) for quite some time.
These limitations are still present and I don’t expect them to be removed 
anytime soon. IMO, it is more likely that certain optimizations like join 
reordering will be done for Table API / SQL queries by the Calcite optimizer 
and pushed through the Flink Dataset optimizer.

I agree, for join reordering optimisations it makes sense to rely on Calcite.
My goal is to understand how current documentation correlates to the Flink’s 
framework status.

I've did an experimental study where I compared Flink and Spark for many 
workloads at very large scale (I’ll share the results soon) and I would like to 
develop a few ideas on top of Flink (from the results Flink is the winner in 
most of the use cases and it is our choice for the platform on which to develop 
and grow).

My interest is in understanding more about Flink today. I am familiar with most 
of the papers written, I am watching the documentation also.
I am looking at the DataSet API, runtime and current architecture.

Best,
Ovidiu

> On 29 Jun 2016, at 17:27, Aljoscha Krettek  wrote:
> 
> Hi,
> I think this document is still up-to-date since not much was done in these 
> parts of the code for the 1.0 release and after that.
> 
> Maybe Timo can give some insights into what optimizations are done in the 
> Table API/SQL that will be be released in an updated version in 1.1.
> 
> Cheers,
> Aljoscha
> 
> +Timo, Explicitly adding Timo
> 
> On Tue, 28 Jun 2016 at 21:41 Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> The optimizer internals described in this document [1] are probably not 
> up-to-date.
> Can you please confirm if this is still valid:
> 
> “The following optimizations are not performed
> Join reordering (or operator reordering in general): Joins / Filters / 
> Reducers are not re-ordered in Flink. This is a high opportunity 
> optimization, but with high risk in the absence of good estimates about the 
> data characteristics. Flink is not doing these optimizations at this point.
> Index vs. Table Scan selection: In Flink, all data sources are always 
> scanned. The data source (the input format) may apply clever mechanism to not 
> scan all the data, but pre-select and project. Examples are the RCFile / 
> ORCFile / Parquet input formats."
> Any update of this page will be very helpful.
> 
> Thank you.
> 
> Best,
> Ovidiu
> [1] https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals 
> 


Re: How to count number of records received per second in processing time while using event time characteristic

2016-06-29 Thread Aljoscha Krettek
Hi,
you can explicitly specify that you want processing-time windows like this:

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(...)

Also note that the timestamp you append in
"writeAsCsv("records-per-second-" + System.currentTimeMillis())" will only
take the timestamp at the time when this function is called, this will only
happen once when your program is started.

Best,
Aljoscha

On Tue, 28 Jun 2016 at 17:33 Saiph Kappa  wrote:

> Hi,
>
> I have a flink streaming application and I want to count records received
> per second (as a way of measuring the throughput of my application).
> However, I am using the EventTime time characteristic, as follows:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val s = env.socketTextStream("localhost", 1234)
>
> s.map(line => 
> Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-"
>  +
>   System.currentTimeMillis())
>
> val mainStrean = s.map(line => {
>   val Array(p1, p2) = line.split(" ")
>   (p1, p2.toInt)
> })
>   .assignAscendingTimestamps(p => System.currentTimeMillis())
>
> which naturally gives me this error:
>
> [error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
> timestamp (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>
> How can I do this?
>
>
> Thanks.
>


Re: Best way to read property file in flink

2016-06-29 Thread Aljoscha Krettek
Hi,
could you load the properties file when starting the application and add it
to the user functions so that it would be serialized along with them? This
way, you wouldn't have to ship the file to each node.

Cheers,
Aljoscha

On Wed, 29 Jun 2016 at 12:09 Janardhan Reddy 
wrote:

> We are running multiple flink jobs inside a yarn session.  For each flink
> job we have a separate property file. We are copying the property files to
> each node in the cluster before submitting the job.
>
> Is there a better way to read the properties file?  Can we read it from
> hdfs or s3. Do we need to write custom code or flink already exposes an api
> to read properties from a file in hdfs or s3.
>
>
>


Re: maximum size of window

2016-06-29 Thread Aljoscha Krettek
Hi,
the result of splitting by key is that processing can easily be distributed
among the workers because the windows for individual keys can be processed
independently. This should improve cluster utilization.

Cheers,
Aljoscha

On Tue, 28 Jun 2016 at 17:26 Vishnu Viswanath 
wrote:

> Hi,
>
> Thank you for the responses.
> I am not sure if I will be able to use Fold/Reduce function, but I will
> keep that in mind.
>
> I have one more question, so what is the implication of having a key that
> splits the data into window of very small size(=> large number of small
> windows) ?
>
> Thanks and Regards,
> Vishnu Viswanath,
>
> On Tue, Jun 28, 2016 at 5:04 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> one thing to add: if you use a ReduceFunction or a FoldFunction for your
>> window the state will not grow with bigger window sizes or larger numbers
>> of elements because the result is eagerly computed. In that case, state
>> size is only dependent on the number of individual keys.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 28 Jun 2016 at 10:36 Kostas Kloudas 
>> wrote:
>>
>>> Hi Vishnu,
>>>
>>> RocksDB allows for storing the window contents on disk when the state of
>>> a window becomes too big.
>>> BUT when you have to trigger and apply the computation of your window
>>> function on that big window,
>>> then all of its state is loaded in memory.
>>>
>>> So although during the window formation phase, RocksDB allows you to not
>>>  worry about storage space,
>>> when it is time to fire your computation, then you have to consider how
>>> much RAM you have and if the
>>> window fits in it.
>>>
>>> Regards,
>>> Kostas
>>>
>>>
>>> On Jun 28, 2016, at 1:25 AM, Vishnu Viswanath <
>>> vishnu.viswanat...@gmail.com> wrote:
>>>
>>> Hi Kostas,
>>>
>>> Thank you.
>>> Yes 2) was exactly what I wanted to know.
>>>
>>> - So if I am using RocksDB as state backend, does that mean that I don't
>>> have to worry much about the memory available per node since RocksDB will
>>> use RAM and Disk to store the window state?
>>>
>>> Regards,
>>> Vishnu
>>>
>>>
>>> On Mon, Jun 27, 2016 at 6:19 PM, Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
 Hi Vishnu,

 I hope the following will help answer your question:

 1) Elements are first split by key (apart from global windows) and then
 are put into windows. In other words, windows are keyed.
 2) A window belonging to a certain key is handled by a single node. In
 other words, no matter how big the window is, its
 state (the elements it contains) will never be split between
 two or more nodes.
 3) Where the state is stored, depends on your state backend. Currently
 Flink supports an in-memory one, a filesystem one, and
 a rocksDB one which is in the middle (first in-memory and then
 disk when needed). Of course you can implement your own.

 From the above, you can see that if you use the memory-backed state
 backend, then your window size is limited by the memory
 available at each of your nodes. If you use the fs state backend, then
 your state is stored on disk. Finally, rocksDB will initially
 use RAM and then spill on disk when no more memory is available.

 Here I have to add that the window documentation is currently being
 re-written to explain new features introduced in Flink 1.1,
 which include more flexible handling of late events and more explicit
 state garbage collection.

 So please stay tuned!

 I hope this helps at answering your question,
 Kostas

 > On Jun 27, 2016, at 8:22 PM, Vishnu Viswanath <
 vishnu.viswanat...@gmail.com> wrote:
 >
 > Hi All,
 >
 > - Is there any restriction on the size of a window in Flink with
 respect to the memory of the nodes?
 > - What happens if a window size grows more than size of a node, will
 it be split into multiple nodes?
 >
 > if I am going to have a huge window, should I have fewer nodes with
 more memory.
 > Is there any documentation on how memory is managed/handled in the
 case of windows and also in the case of joins.
 >
 > Regards,
 > Vishnu


>>>
>>>
>
>


Re: Optimizations not performed - please confirm

2016-06-29 Thread Aljoscha Krettek
Hi,
I think this document is still up-to-date since not much was done in these
parts of the code for the 1.0 release and after that.

Maybe Timo can give some insights into what optimizations are done in the
Table API/SQL that will be be released in an updated version in 1.1.

Cheers,
Aljoscha

+Timo, Explicitly adding Timo

On Tue, 28 Jun 2016 at 21:41 Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi,
>
> The optimizer internals described in this document [1] are probably not
> up-to-date.
> Can you please confirm if this is still valid:
>
> *“The following optimizations are not performed*
>
>- *Join reordering (or operator reordering in general): Joins /
>Filters / Reducers are not re-ordered in Flink. This is a high opportunity
>optimization, but with high risk in the absence of good estimates about the
>data characteristics. Flink is not doing these optimizations at this 
> point.*
>- *Index vs. Table Scan selection: In Flink, all data sources are
>always scanned. The data source (the input format) may apply clever
>mechanism to not scan all the data, but pre-select and project. Examples
>are the RCFile / ORCFile / Parquet input formats."*
>
> Any update of this page will be very helpful.
>
> Thank you.
>
> Best,
> Ovidiu
> [1] https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals
>


Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread Martin Scholl
Other than increasing the ask.timeout, we've seen such failures being
caused by long GC pauses over bigger heaps. In such a case, you could
fiddle with a) enabling object reuse, or b) enabling off-heap memory (i.e.
taskmanager.memory.off-heap == true) to mitigate GC-induced issues a bit.

Hope it helps,
Martin

On Wed, Jun 29, 2016 at 3:29 PM Ufuk Celebi  wrote:

> OK, looks like you can easily give more memory to the network stack,
> e.g. for 2 GB set
>
> taskmanager.network.numberOfBuffers = 65536
> taskmanager.network.bufferSizeInBytes = 32768
>
> For the other exception, your logs confirm that there is something
> else going on. Try increasing the akka ask timeout:
>
> akka.ask.timeout: 100 s
>
> Does this help?
>
>
> On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it>
> wrote:
> > Hi Ufuk,
> >
> > so the memory available per node is 48294 megabytes per node, but I
> reserve
> > 28 by flink conf file.
> > taskmanager.heap.mb = 28672
> > taskmanager.memory.fraction = 0.7
> > taskmanager.network.numberOfBuffers = 32768
> > taskmanager.network.bufferSizeInBytes = 16384
> >
> > Anyway Follows what I found in log files.
> >
> > Follows the taskmanager log (task manager that seems failed)
> >
> > 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> (1/1)
> > switched to FAILED with exception.
> > java.lang.IllegalStateException: Received unexpected partition state null
> > for partition request. This is a bug.
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> > at
> > org.apache.flink.runtime.taskmanager.TaskManager.org
> $apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
> > 468)
> > at
> >
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> > at
> >
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> > at
> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > at
> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> > at
> >
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> > at
> >
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> > Follows the jobmanager log
> >
> > 2016-06-29 11:31:34,683 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
> Reduce
> > (Reduce at dima.tu.berlin.benchmark.fli
> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> (1/1)
> > (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
> > 2016-06-29 11:31:34,694 INFO
> org.apache.flink.runtime.jobmanager.JobManager
> > - Status of job 71542654d427e8d0e7e01c538abe1acf (peel
> > -bundle-f

Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread Ufuk Celebi
OK, looks like you can easily give more memory to the network stack,
e.g. for 2 GB set

taskmanager.network.numberOfBuffers = 65536
taskmanager.network.bufferSizeInBytes = 32768

For the other exception, your logs confirm that there is something
else going on. Try increasing the akka ask timeout:

akka.ask.timeout: 100 s

Does this help?


On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it> wrote:
> Hi Ufuk,
>
> so the memory available per node is 48294 megabytes per node, but I reserve
> 28 by flink conf file.
> taskmanager.heap.mb = 28672
> taskmanager.memory.fraction = 0.7
> taskmanager.network.numberOfBuffers = 32768
> taskmanager.network.bufferSizeInBytes = 16384
>
> Anyway Follows what I found in log files.
>
> Follows the taskmanager log (task manager that seems failed)
>
> 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
> - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> switched to FAILED with exception.
> java.lang.IllegalStateException: Received unexpected partition state null
> for partition request. This is a bug.
> at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
> 468)
> at
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Follows the jobmanager log
>
> 2016-06-29 11:31:34,683 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN Reduce
> (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
> 2016-06-29 11:31:34,694 INFO  org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 71542654d427e8d0e7e01c538abe1acf (peel
> -bundle-flink) changed to FAILING.
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 1 milliseconds
> at
> org.apa

Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread ANDREA SPINA
Hi Ufuk,

so the memory available per node is 48294 megabytes per node, but I reserve
28 by flink conf file.
taskmanager.heap.mb = 28672
taskmanager.memory.fraction = 0.7
taskmanager.network.numberOfBuffers = 32768
taskmanager.network.bufferSizeInBytes = 16384

Anyway Follows what I found in log files.

*Follows the taskmanager log (task manager that seems failed)*

2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
- CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
switched to FAILED with exception.
java.lang.IllegalStateException: Received unexpected partition state null
for partition request. This is a bug.
at
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
at org.apache.flink.runtime.taskmanager.TaskManager.org
$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
468)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


*Follows the jobmanager log*

2016-06-29 11:31:34,683 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
(8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
2016-06-29 11:31:34,694 INFO
 org.apache.flink.runtime.jobmanager.JobManager- Status of
job 71542654d427e8d0e7e01c538abe1acf (peel
-bundle-flink) changed to FAILING.
java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
ector(sGradientDescentL2.scala:43)) -> Map (Map at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
(c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
1 milliseconds
at
org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
at akka.dispatch.OnComplete.internal(Future.scala:246)
at akka.dispatch.OnComplete.internal(Future.scala:244)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at
scala.concurrent.forkjoin.ForkJoinTask.do

Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread Ufuk Celebi
Hey Andrea! Sorry for the bad user experience.

Regarding the network buffers: you should be able to run it after
increasing the number of network buffers, just account for it when
specifying the heap size etc. You currently allocate 32768 * 16384
bytes = 512 MB for them. If you have a very long pipeline and high
parallelism, you should increase it accordingly. How much memory do
you have on your machines?

Regarding the IllegalStateException: I suspect that this is **not**
the root failure cause. The null ExecutionState can only happen, if
the producer task (from which data is requested) failed during the
request. The error message is confusing and I opened a JIRA to fix it:
https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
your complete logs to see what the root cause might be, e.g. why did
the producer fail?


On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
<74...@studenti.unimore.it> wrote:
> Hi everyone,
>
> I am running some Flink experiments with Peel benchmark
> http://peel-framework.org/ and I am struggling with exceptions: the
> environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB
> and is located on Hdfs 2.7.1. Flink version is 1.0.3.
>
> At the beginning I tried with 400 as degree of parallelism but not enough
> numberOfBuffers was raised so I changed the parallelism to 200. Flink
> configuration follows:
>
> jobmanager.rpc.address = ${runtime.hostname}
> akka.log.lifecycle.events = ON
> akka.ask.timeout = 300s
> jobmanager.rpc.port = 6002
> jobmanager.heap.mb = 1024
> jobmanager.web.port = 6004
> taskmanager.heap.mb = 28672
> taskmanager.memory.fraction = 0.7
> taskmanager.network.numberOfBuffers = 32768
> taskmanager.network.bufferSizeInBytes = 16384
> taskmanager.tmp.dirs =
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> taskmanager.debug.memory.startLogThread = true
>
> With a parallelism of 200 the following exception will raise from a node of
> the cluster:
>
> 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
> - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> switched to FAILED with exception.
> java.lang.IllegalStateException: Received unexpected partition state null
> for partition request. This is a bug.
> at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>
>
> The reduce code is:
>
> 43  val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
>
> The map code is:
>
> 68  def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = {
> 69dimensionDS.map {
> 70  dimension =>
> 71  val values = DenseVector(Array.fill(dimension)(0.0))
> 72  values
> 73}
> 74  }
>
> I can't figure out a solution, thank you for your help.
>
> Andrea
>
> --
> Andrea Spina
> N.Tessera: 74598
> MAT: 89369
> Ingegneria Informatica [LM] (D.M. 270)


Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread ANDREA SPINA
Hi everyone,

I am running some Flink experiments with Peel benchmark
http://peel-framework.org/ and I am struggling with exceptions: the
environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
~80GiB and is located on Hdfs 2.7.1. Flink version is 1.0.3.

At the beginning I tried with 400 as degree of parallelism but not enough
numberOfBuffers was raised so I changed the parallelism to 200. Flink
configuration follows:

jobmanager.rpc.address = ${runtime.hostname}
akka.log.lifecycle.events = ON
akka.ask.timeout = 300s
jobmanager.rpc.port = 6002
jobmanager.heap.mb = 1024
jobmanager.web.port = 6004
taskmanager.heap.mb = 28672
taskmanager.memory.fraction = 0.7
taskmanager.network.numberOfBuffers = 32768
taskmanager.network.bufferSizeInBytes = 16384
taskmanager.tmp.dirs =
"/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
taskmanager.debug.memory.startLogThread = true

With a parallelism of 200 the following exception will raise from a node of
the cluster:

2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
- CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
switched to FAILED with exception.
java.lang.IllegalStateException: Received unexpected partition state null
for partition request. This is a bug.
at
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)


The reduce code is:

43  val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)

The map code is:

68  def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = {
69dimensionDS.map {
70  dimension =>
71  val values = DenseVector(Array.fill(dimension)(0.0))
72  values
73}
74  }

I can't figure out a solution, thank you for your help.

Andrea

-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


Best way to read property file in flink

2016-06-29 Thread Janardhan Reddy
We are running multiple flink jobs inside a yarn session.  For each flink
job we have a separate property file. We are copying the property files to
each node in the cluster before submitting the job.

Is there a better way to read the properties file?  Can we read it from
hdfs or s3. Do we need to write custom code or flink already exposes an api
to read properties from a file in hdfs or s3.


Re: Flink java.io.FileNotFoundException Exception with Peel Framework

2016-06-29 Thread ANDREA SPINA
Hi,

the problem was solved after I figured out there was an istance of Flink
TaskManager running on a node of the cluster.
Thank you,
Andrea

2016-06-28 12:17 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>:

> Hi Max,
> thank you for the fast reply and sorry: I use flink-1.0.3.
> Yes I tested on dummy dataset with numOfBuffers = 16384 and decreasing the
> parallelism degree and this solution solved the first exception. Anyway on
> the 80GiB dataset I struggle with the second exception.
>
> Regards,
> Andrea
>
> 2016-06-28 12:08 GMT+02:00 Maximilian Michels :
>
>> Hi Andrea,
>>
>> The number of network buffers should be sufficient. Actually, assuming
>> you have 16 task slots on each of the 25 nodes, it should be enough to
>> have 16^2 * 25 * 4 = 14400 network buffers.
>>
>> See
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#background
>>
>> So we have to investigate a little more. Which version of Flink are you
>> using?
>>
>> Cheers,
>> Max
>>
>> On Tue, Jun 28, 2016 at 11:57 AM, ANDREA SPINA
>> <74...@studenti.unimore.it> wrote:
>> > Hi everyone,
>> >
>> > I am running some Flink experiments with Peel benchmark
>> > http://peel-framework.org/ and I am struggling with exceptions: the
>> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
>> ~80GiB
>> > and is located on Hdfs 2.7.1.
>> >
>> > At the beginning I tried with 400 as degree of parallelism and with the
>> > following configuration:
>> >
>> > jobmanager.rpc.address = ${runtime.hostname}
>> > akka.log.lifecycle.events = ON
>> > akka.ask.timeout = 300s
>> > jobmanager.rpc.port = 6002
>> >
>> > jobmanager.heap.mb = 1024
>> > jobmanager.web.port = 6004
>> >
>> > taskmanager.heap.mb = 28672
>> > taskmanager.memory.fraction = 0.7
>> > taskmanager.network.numberOfBuffers = 32768
>> > taskmanager.network.bufferSizeInBytes = 16384
>> > taskmanager.tmp.dirs =
>> >
>> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
>> > taskmanager.debug.memory.startLogThread = true
>> >
>> > the following exception will raise
>> >
>> > Caused by: java.io.IOException: Insufficient number of network buffers:
>> > required 350, but only 317 available. The total number of network
>> buffers is
>> > currently set to 32768. You can increase this number by setting the
>> > configuration key 'taskmanager.network.numberOfBuffers'.
>> > at
>> >
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>> > at
>> >
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:469)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > So I tried different solutions, both with increasing numberOfBuffers
>> (Max
>> > value tried 98304) or decreasing the degreeOfParallelism (Min value
>> tried
>> > 300) and testing those configs with a dummy dataset seems to solve the
>> > number of buffers issue.
>> > But In each case with the 80GiB dataset now I struggle with a new
>> exception;
>> > the following with a degree of parallelism = 300 and numberOfBuffers =
>> 32768
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The program
>> > execution failed: Job execution failed.
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> > at
>> >
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> > at
>> >
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
>> > at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon$.main(FlinkSLRTrainCommon.scala:110)
>> > at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon.main(FlinkSLRTrainCommon.scala)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> > at
>> >
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> > at
>> >
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>> > Caused by: org.apache.flink.runtime.client