Re: Restore from checkpoint
Hi Phil, correction: But the error you have is a familiar error if you have written some code to handle directory path. --> But the error you have is a familiar error if you have written some code to handle directory path with Java. No offence. Best regards. Jiadong. Lu Jiadong Lu 于2024年5月20日周一 14:42写道: > > Hi, Phil > > I don't have more expertise about the flink-python module. But the error > you have is a familiar error if you have written some code to handle > directory path. > > The correct form of Path/URI will be : > 1. "/home/foo" > 2. "file:///home/foo/boo" > 3. "hdfs:///home/foo/boo" > 4. or Win32 directory form > > Best regards, > Jiadong Lu > > On 2024/5/20 02:28, Phil Stavridis wrote: > > Hi Lu, > > > > Thanks for your reply. In what way are the paths to get passed to the job > > that needs to used the checkpoint? Is the standard way, using -s :/ > > or by passing the path in the module as a Python arg? > > > > Kind regards > > Phil > > > >> On 18 May 2024, at 03:19, jiadong.lu wrote: > >> > >> Hi Phil, > >> > >> AFAIK, the error indicated your path was incorrect. > >> your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or > >> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead. > >> > >> Best. > >> Jiadong.Lu > >> > >> On 5/18/24 2:37 AM, Phil Stavridis wrote: > >>> Hi, > >>> I am trying to test how the checkpoints work for restoring state, but not > >>> sure how to run a new instance of a flink job, after I have cancelled it, > >>> using the checkpoints which I store in the filesystem of the job manager, > >>> e.g. /opt/flink/checkpoints. > >>> I have tried passing the checkpoint as an argument in the function and > >>> use it while setting the checkpoint but it looks like the way it is done > >>> is something like below: > >>> docker-compose exec jobmanager flink run -s > >>> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py > >>> /opt/app/flink_job.py > >>> But I am getting error: > >>> Caused by: java.io.IOException: Checkpoint/savepoint path > >>> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid > >>> file URI. Either the pointer path is invalid, or the checkpoint was > >>> created by a different state backend. > >>> What is wrong with the way the job is re-submitted to the cluster? > >>> Kind regards > >>> Phil > >
Re: Restore from checkpoint
Hi, Phil I don't have more expertise about the flink-python module. But the error you have is a familiar error if you have written some code to handle directory path. The correct form of Path/URI will be : 1. "/home/foo" 2. "file:///home/foo/boo" 3. "hdfs:///home/foo/boo" 4. or Win32 directory form Best regards, Jiadong Lu On 2024/5/20 02:28, Phil Stavridis wrote: Hi Lu, Thanks for your reply. In what way are the paths to get passed to the job that needs to used the checkpoint? Is the standard way, using -s :/ or by passing the path in the module as a Python arg? Kind regards Phil On 18 May 2024, at 03:19, jiadong.lu wrote: Hi Phil, AFAIK, the error indicated your path was incorrect. your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead. Best. Jiadong.Lu On 5/18/24 2:37 AM, Phil Stavridis wrote: Hi, I am trying to test how the checkpoints work for restoring state, but not sure how to run a new instance of a flink job, after I have cancelled it, using the checkpoints which I store in the filesystem of the job manager, e.g. /opt/flink/checkpoints. I have tried passing the checkpoint as an argument in the function and use it while setting the checkpoint but it looks like the way it is done is something like below: docker-compose exec jobmanager flink run -s :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py But I am getting error: Caused by: java.io.IOException: Checkpoint/savepoint path ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. Either the pointer path is invalid, or the checkpoint was created by a different state backend. What is wrong with the way the job is re-submitted to the cluster? Kind regards Phil
Re: Restore from checkpoint
Hi Phil, I think you can use the "-s :checkpointMetaDataPath" arg to resume the job from a retained checkpoint[1]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint Best, Jinzhong Li On Mon, May 20, 2024 at 2:29 AM Phil Stavridis wrote: > Hi Lu, > > Thanks for your reply. In what way are the paths to get passed to the job > that needs to used the checkpoint? Is the standard way, using -s :/ > or by passing the path in the module as a Python arg? > > Kind regards > Phil > > > On 18 May 2024, at 03:19, jiadong.lu wrote: > > > > Hi Phil, > > > > AFAIK, the error indicated your path was incorrect. > > your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or > 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead. > > > > Best. > > Jiadong.Lu > > > > On 5/18/24 2:37 AM, Phil Stavridis wrote: > >> Hi, > >> I am trying to test how the checkpoints work for restoring state, but > not sure how to run a new instance of a flink job, after I have cancelled > it, using the checkpoints which I store in the filesystem of the job > manager, e.g. /opt/flink/checkpoints. > >> I have tried passing the checkpoint as an argument in the function and > use it while setting the checkpoint but it looks like the way it is done is > something like below: > >> docker-compose exec jobmanager flink run -s > :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py > /opt/app/flink_job.py > >> But I am getting error: > >> Caused by: java.io.IOException: Checkpoint/savepoint path > ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file > URI. Either the pointer path is invalid, or the checkpoint was created by a > different state backend. > >> What is wrong with the way the job is re-submitted to the cluster? > >> Kind regards > >> Phil > >
Re: Restore from checkpoint
Hi Lu, Thanks for your reply. In what way are the paths to get passed to the job that needs to used the checkpoint? Is the standard way, using -s :/ or by passing the path in the module as a Python arg? Kind regards Phil > On 18 May 2024, at 03:19, jiadong.lu wrote: > > Hi Phil, > > AFAIK, the error indicated your path was incorrect. > your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or > 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead. > > Best. > Jiadong.Lu > > On 5/18/24 2:37 AM, Phil Stavridis wrote: >> Hi, >> I am trying to test how the checkpoints work for restoring state, but not >> sure how to run a new instance of a flink job, after I have cancelled it, >> using the checkpoints which I store in the filesystem of the job manager, >> e.g. /opt/flink/checkpoints. >> I have tried passing the checkpoint as an argument in the function and use >> it while setting the checkpoint but it looks like the way it is done is >> something like below: >> docker-compose exec jobmanager flink run -s >> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py >> /opt/app/flink_job.py >> But I am getting error: >> Caused by: java.io.IOException: Checkpoint/savepoint path >> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file >> URI. Either the pointer path is invalid, or the checkpoint was created by a >> different state backend. >> What is wrong with the way the job is re-submitted to the cluster? >> Kind regards >> Phil
Re: Restore from checkpoint
Hi Phil, AFAIK, the error indicated your path was incorrect. your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead. Best. Jiadong.Lu On 5/18/24 2:37 AM, Phil Stavridis wrote: Hi, I am trying to test how the checkpoints work for restoring state, but not sure how to run a new instance of a flink job, after I have cancelled it, using the checkpoints which I store in the filesystem of the job manager, e.g. /opt/flink/checkpoints. I have tried passing the checkpoint as an argument in the function and use it while setting the checkpoint but it looks like the way it is done is something like below: docker-compose exec jobmanager flink run -s :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py But I am getting error: Caused by: java.io.IOException: Checkpoint/savepoint path ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. Either the pointer path is invalid, or the checkpoint was created by a different state backend. What is wrong with the way the job is re-submitted to the cluster? Kind regards Phil
Re: Restore from Checkpoint from local Standalone Job
Hi Sandeep, I think it should work fine with `StandaloneCompletedCheckpointStore`. Have you checked if your directory /Users/test/savepoint is being populated in the first place? And if so, if the restarted job is not throwing some exceptions like it can not access those files? Also note, that cancel with savepoint is deprecated and you should be using stop with savepoint [1] Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#terminating-a-job pt., 26 mar 2021 o 18:55 Sandeep khanzode napisał(a): > Hello > > > I was reading this: > https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question > > > I am trying to run a standalone job on my local with a single job manager > and task manager. > > > > I have enabled checkpointing as below: > > env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", > true)); > > env.enableCheckpointing(30 * 1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000); > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > After I stop my job (I also tried to cancel the job using bin/flink cancel > -s /Users/test/savepoint ), I tried to start the same job using… > > ./standalone-job.sh start-foreground test.jar --job-id > --job-classname com.test.MyClass --fromSavepoint /Users/test/savepoint > > > But it never restores the state, and always starts afresh. > > > In Flink, I see this: > > StandaloneCompletedCheckpointStore > > * {@link CompletedCheckpointStore} for JobManagers running in {@link > HighAvailabilityMode#NONE}. > > public void recover() throws Exception { > > // Nothing to do > } > > > Does this have something to do with not being able to restore state? > > Does this need Zookeeper or K8S HA for functioning? > > > Thanks, > Sandeep > >