Re: Fault Tolerance for Flink Iterations

2015-04-22 Thread Markus Holzemer
Hi Stephan,
thanks for your answer! I thought about this, too, but I am not sure if
this solves to problem.
I think it works if there is only a temporary failure an the job can be
restartet with the same dop.
But it does not work if one node is permanently lost and the dop needs to
be adjusted.
If the iteration expects a sorted or hashed input, I can not just read from
a previous stored checkpoint with a lower dop because this property will be
lost.
I don't see how moving the computation from the JobManager to the driver
solves this problem.

best,
Markus

2015-04-21 19:36 GMT+02:00 Stephan Ewen se...@apache.org:

 Hi Markus!

 I see your point. My first guess would be that it would be simpler to do
 this logic in the driver program, rather
 than inside the JobManager. If the checkpoints are all written and the job
 fails, you check what was the latest completed
 checkpoint (by file) and then start the program again with the source that
 refers to those files.

 That way, you go through the proper stack (optimizer and jobgraph
 generator) that inserts all the necessary partition and
 sort operations.

 Greetings,
 Stephan



 On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer 
 holzemer.mar...@googlemail.com wrote:

  Hi everybody,
 
  I am writing my master thesis about making flink iterations / iterative
  flink algorithms fault tolerant.
  The first approach I implemented is a basic checkpointing, where every N
  iterations the current state is saved into HDFS.
  To do this I enabled data sinks inside of iterations, then attached a new
  checkpointing sink to the beginning of each iteration. To recover from a
  previous checkpoint I cancel all tasks, add a new datasource in front of
  the iteration and reschedule the tasks with lower dop. I do this out of
 the
  JobManager during runtime without starting a new job.
  The problem is that sometimes the input data to the iteration has some
  properties like a certain partitioning or sorting, and I am struggeling
  with reconstructing theses properties from the checkpoint source.
  I figured that an easier way to do this is to re-optimize the new plan
  (with the new source as input to the iteration) before the rescheduling.
  But in the current project structure flink-runtime has no access to
  flink-optimizer and it would be a major design break to change this.
  Has somebody any advice on this?
 
  best,
  Markus
 



Re: Fault Tolerance for Flink Iterations

2015-04-22 Thread Stephan Ewen
I think it should circumvent many of the problems. If you re-submit the
program from the client with
a lower DOP, all intermediae state (saved by you in files) will be
re-partitioned, because no
assumptions about prior properties are made.

I may not be getting the whole picture here, so sorry, if I am not hitting
the point, actually...

On Wed, Apr 22, 2015 at 9:44 AM, Markus Holzemer 
holzemer.mar...@googlemail.com wrote:

 Hi Stephan,
 thanks for your answer! I thought about this, too, but I am not sure if
 this solves to problem.
 I think it works if there is only a temporary failure an the job can be
 restartet with the same dop.
 But it does not work if one node is permanently lost and the dop needs to
 be adjusted.
 If the iteration expects a sorted or hashed input, I can not just read from
 a previous stored checkpoint with a lower dop because this property will be
 lost.
 I don't see how moving the computation from the JobManager to the driver
 solves this problem.

 best,
 Markus

 2015-04-21 19:36 GMT+02:00 Stephan Ewen se...@apache.org:

  Hi Markus!
 
  I see your point. My first guess would be that it would be simpler to do
  this logic in the driver program, rather
  than inside the JobManager. If the checkpoints are all written and the
 job
  fails, you check what was the latest completed
  checkpoint (by file) and then start the program again with the source
 that
  refers to those files.
 
  That way, you go through the proper stack (optimizer and jobgraph
  generator) that inserts all the necessary partition and
  sort operations.
 
  Greetings,
  Stephan
 
 
 
  On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer 
  holzemer.mar...@googlemail.com wrote:
 
   Hi everybody,
  
   I am writing my master thesis about making flink iterations / iterative
   flink algorithms fault tolerant.
   The first approach I implemented is a basic checkpointing, where every
 N
   iterations the current state is saved into HDFS.
   To do this I enabled data sinks inside of iterations, then attached a
 new
   checkpointing sink to the beginning of each iteration. To recover from
 a
   previous checkpoint I cancel all tasks, add a new datasource in front
 of
   the iteration and reschedule the tasks with lower dop. I do this out of
  the
   JobManager during runtime without starting a new job.
   The problem is that sometimes the input data to the iteration has some
   properties like a certain partitioning or sorting, and I am struggeling
   with reconstructing theses properties from the checkpoint source.
   I figured that an easier way to do this is to re-optimize the new plan
   (with the new source as input to the iteration) before the
 rescheduling.
   But in the current project structure flink-runtime has no access to
   flink-optimizer and it would be a major design break to change this.
   Has somebody any advice on this?
  
   best,
   Markus
  
 



Fault Tolerance for Flink Iterations

2015-04-21 Thread Markus Holzemer
Hi everybody,

I am writing my master thesis about making flink iterations / iterative
flink algorithms fault tolerant.
The first approach I implemented is a basic checkpointing, where every N
iterations the current state is saved into HDFS.
To do this I enabled data sinks inside of iterations, then attached a new
checkpointing sink to the beginning of each iteration. To recover from a
previous checkpoint I cancel all tasks, add a new datasource in front of
the iteration and reschedule the tasks with lower dop. I do this out of the
JobManager during runtime without starting a new job.
The problem is that sometimes the input data to the iteration has some
properties like a certain partitioning or sorting, and I am struggeling
with reconstructing theses properties from the checkpoint source.
I figured that an easier way to do this is to re-optimize the new plan
(with the new source as input to the iteration) before the rescheduling.
But in the current project structure flink-runtime has no access to
flink-optimizer and it would be a major design break to change this.
Has somebody any advice on this?

best,
Markus


Re: Fault Tolerance for Flink Iterations

2015-04-21 Thread Stephan Ewen
Hi Markus!

I see your point. My first guess would be that it would be simpler to do
this logic in the driver program, rather
than inside the JobManager. If the checkpoints are all written and the job
fails, you check what was the latest completed
checkpoint (by file) and then start the program again with the source that
refers to those files.

That way, you go through the proper stack (optimizer and jobgraph
generator) that inserts all the necessary partition and
sort operations.

Greetings,
Stephan



On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer 
holzemer.mar...@googlemail.com wrote:

 Hi everybody,

 I am writing my master thesis about making flink iterations / iterative
 flink algorithms fault tolerant.
 The first approach I implemented is a basic checkpointing, where every N
 iterations the current state is saved into HDFS.
 To do this I enabled data sinks inside of iterations, then attached a new
 checkpointing sink to the beginning of each iteration. To recover from a
 previous checkpoint I cancel all tasks, add a new datasource in front of
 the iteration and reschedule the tasks with lower dop. I do this out of the
 JobManager during runtime without starting a new job.
 The problem is that sometimes the input data to the iteration has some
 properties like a certain partitioning or sorting, and I am struggeling
 with reconstructing theses properties from the checkpoint source.
 I figured that an easier way to do this is to re-optimize the new plan
 (with the new source as input to the iteration) before the rescheduling.
 But in the current project structure flink-runtime has no access to
 flink-optimizer and it would be a major design break to change this.
 Has somebody any advice on this?

 best,
 Markus