Re: Fault Tolerance for Flink Iterations
Hi Stephan, thanks for your answer! I thought about this, too, but I am not sure if this solves to problem. I think it works if there is only a temporary failure an the job can be restartet with the same dop. But it does not work if one node is permanently lost and the dop needs to be adjusted. If the iteration expects a sorted or hashed input, I can not just read from a previous stored checkpoint with a lower dop because this property will be lost. I don't see how moving the computation from the JobManager to the driver solves this problem. best, Markus 2015-04-21 19:36 GMT+02:00 Stephan Ewen se...@apache.org: Hi Markus! I see your point. My first guess would be that it would be simpler to do this logic in the driver program, rather than inside the JobManager. If the checkpoints are all written and the job fails, you check what was the latest completed checkpoint (by file) and then start the program again with the source that refers to those files. That way, you go through the proper stack (optimizer and jobgraph generator) that inserts all the necessary partition and sort operations. Greetings, Stephan On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer holzemer.mar...@googlemail.com wrote: Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus
Re: Fault Tolerance for Flink Iterations
I think it should circumvent many of the problems. If you re-submit the program from the client with a lower DOP, all intermediae state (saved by you in files) will be re-partitioned, because no assumptions about prior properties are made. I may not be getting the whole picture here, so sorry, if I am not hitting the point, actually... On Wed, Apr 22, 2015 at 9:44 AM, Markus Holzemer holzemer.mar...@googlemail.com wrote: Hi Stephan, thanks for your answer! I thought about this, too, but I am not sure if this solves to problem. I think it works if there is only a temporary failure an the job can be restartet with the same dop. But it does not work if one node is permanently lost and the dop needs to be adjusted. If the iteration expects a sorted or hashed input, I can not just read from a previous stored checkpoint with a lower dop because this property will be lost. I don't see how moving the computation from the JobManager to the driver solves this problem. best, Markus 2015-04-21 19:36 GMT+02:00 Stephan Ewen se...@apache.org: Hi Markus! I see your point. My first guess would be that it would be simpler to do this logic in the driver program, rather than inside the JobManager. If the checkpoints are all written and the job fails, you check what was the latest completed checkpoint (by file) and then start the program again with the source that refers to those files. That way, you go through the proper stack (optimizer and jobgraph generator) that inserts all the necessary partition and sort operations. Greetings, Stephan On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer holzemer.mar...@googlemail.com wrote: Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus
Fault Tolerance for Flink Iterations
Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus
Re: Fault Tolerance for Flink Iterations
Hi Markus! I see your point. My first guess would be that it would be simpler to do this logic in the driver program, rather than inside the JobManager. If the checkpoints are all written and the job fails, you check what was the latest completed checkpoint (by file) and then start the program again with the source that refers to those files. That way, you go through the proper stack (optimizer and jobgraph generator) that inserts all the necessary partition and sort operations. Greetings, Stephan On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer holzemer.mar...@googlemail.com wrote: Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus