Re: Restore from checkpoint

2024-05-20 Thread archzi lu
Hi Phil,
correction: But the error
you have is a familiar error if you have written some code to handle
directory path.  --> But the error
you have is a familiar error if you have written some code to handle
directory path with Java.

No offence.

Best regards.
Jiadong. Lu

Jiadong Lu  于2024年5月20日周一 14:42写道:
>
> Hi, Phil
>
> I don't have more expertise about the flink-python module. But the error
> you have is a familiar error if you have written some code to handle
> directory path.
>
> The correct form of Path/URI will be :
> 1. "/home/foo"
> 2. "file:///home/foo/boo"
> 3. "hdfs:///home/foo/boo"
> 4. or Win32 directory form
>
> Best regards,
> Jiadong Lu
>
> On 2024/5/20 02:28, Phil Stavridis wrote:
> > Hi Lu,
> >
> > Thanks for your reply. In what way are the paths to get passed to the job 
> > that needs to used the checkpoint? Is the standard way, using -s :/ 
> > or by passing the path in the module as a Python arg?
> >
> > Kind regards
> > Phil
> >
> >> On 18 May 2024, at 03:19, jiadong.lu  wrote:
> >>
> >> Hi Phil,
> >>
> >> AFAIK, the error indicated your path was incorrect.
> >> your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
> >> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> >>
> >> Best.
> >> Jiadong.Lu
> >>
> >> On 5/18/24 2:37 AM, Phil Stavridis wrote:
> >>> Hi,
> >>> I am trying to test how the checkpoints work for restoring state, but not 
> >>> sure how to run a new instance of a flink job, after I have cancelled it, 
> >>> using the checkpoints which I store in the filesystem of the job manager, 
> >>> e.g. /opt/flink/checkpoints.
> >>> I have tried passing the checkpoint as an argument in the function and 
> >>> use it while setting the checkpoint but it looks like the way it is done 
> >>> is something like below:
> >>> docker-compose exec jobmanager flink run -s 
> >>> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py 
> >>> /opt/app/flink_job.py
> >>> But I am getting error:
> >>> Caused by: java.io.IOException: Checkpoint/savepoint path 
> >>> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid 
> >>> file URI. Either the pointer path is invalid, or the checkpoint was 
> >>> created by a different state backend.
> >>> What is wrong with the  way the job is re-submitted to the cluster?
> >>> Kind regards
> >>> Phil
> >


Re: Restore from checkpoint

2024-05-20 Thread Jiadong Lu

Hi, Phil

I don't have more expertise about the flink-python module. But the error 
you have is a familiar error if you have written some code to handle 
directory path.


The correct form of Path/URI will be :
1. "/home/foo"
2. "file:///home/foo/boo"
3. "hdfs:///home/foo/boo"
4. or Win32 directory form

Best regards,
Jiadong Lu

On 2024/5/20 02:28, Phil Stavridis wrote:

Hi Lu,

Thanks for your reply. In what way are the paths to get passed to the job that needs 
to used the checkpoint? Is the standard way, using -s :/ or by passing 
the path in the module as a Python arg?

Kind regards
Phil


On 18 May 2024, at 03:19, jiadong.lu  wrote:

Hi Phil,

AFAIK, the error indicated your path was incorrect.
your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.

Best.
Jiadong.Lu

On 5/18/24 2:37 AM, Phil Stavridis wrote:

Hi,
I am trying to test how the checkpoints work for restoring state, but not sure 
how to run a new instance of a flink job, after I have cancelled it, using the 
checkpoints which I store in the filesystem of the job manager, e.g. 
/opt/flink/checkpoints.
I have tried passing the checkpoint as an argument in the function and use it 
while setting the checkpoint but it looks like the way it is done is something 
like below:
docker-compose exec jobmanager flink run -s 
:/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py
But I am getting error:
Caused by: java.io.IOException: Checkpoint/savepoint path 
':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. 
Either the pointer path is invalid, or the checkpoint was created by a 
different state backend.
What is wrong with the  way the job is re-submitted to the cluster?
Kind regards
Phil




Re: Restore from checkpoint

2024-05-19 Thread Jinzhong Li
Hi Phil,

I think you can use the "-s :checkpointMetaDataPath" arg  to resume the job
from a retained checkpoint[1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint

Best,
Jinzhong Li


On Mon, May 20, 2024 at 2:29 AM Phil Stavridis  wrote:

> Hi Lu,
>
> Thanks for your reply. In what way are the paths to get passed to the job
> that needs to used the checkpoint? Is the standard way, using -s :/
> or by passing the path in the module as a Python arg?
>
> Kind regards
> Phil
>
> > On 18 May 2024, at 03:19, jiadong.lu  wrote:
> >
> > Hi Phil,
> >
> > AFAIK, the error indicated your path was incorrect.
> > your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or
> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> >
> > Best.
> > Jiadong.Lu
> >
> > On 5/18/24 2:37 AM, Phil Stavridis wrote:
> >> Hi,
> >> I am trying to test how the checkpoints work for restoring state, but
> not sure how to run a new instance of a flink job, after I have cancelled
> it, using the checkpoints which I store in the filesystem of the job
> manager, e.g. /opt/flink/checkpoints.
> >> I have tried passing the checkpoint as an argument in the function and
> use it while setting the checkpoint but it looks like the way it is done is
> something like below:
> >> docker-compose exec jobmanager flink run -s
> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py
> /opt/app/flink_job.py
> >> But I am getting error:
> >> Caused by: java.io.IOException: Checkpoint/savepoint path
> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file
> URI. Either the pointer path is invalid, or the checkpoint was created by a
> different state backend.
> >> What is wrong with the  way the job is re-submitted to the cluster?
> >> Kind regards
> >> Phil
>
>


Re: Restore from checkpoint

2024-05-19 Thread Phil Stavridis
Hi Lu,

Thanks for your reply. In what way are the paths to get passed to the job that 
needs to used the checkpoint? Is the standard way, using -s :/ or by 
passing the path in the module as a Python arg?

Kind regards
Phil

> On 18 May 2024, at 03:19, jiadong.lu  wrote:
> 
> Hi Phil,
> 
> AFAIK, the error indicated your path was incorrect.
> your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> 
> Best.
> Jiadong.Lu
> 
> On 5/18/24 2:37 AM, Phil Stavridis wrote:
>> Hi,
>> I am trying to test how the checkpoints work for restoring state, but not 
>> sure how to run a new instance of a flink job, after I have cancelled it, 
>> using the checkpoints which I store in the filesystem of the job manager, 
>> e.g. /opt/flink/checkpoints.
>> I have tried passing the checkpoint as an argument in the function and use 
>> it while setting the checkpoint but it looks like the way it is done is 
>> something like below:
>> docker-compose exec jobmanager flink run -s 
>> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py 
>> /opt/app/flink_job.py
>> But I am getting error:
>> Caused by: java.io.IOException: Checkpoint/savepoint path 
>> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file 
>> URI. Either the pointer path is invalid, or the checkpoint was created by a 
>> different state backend.
>> What is wrong with the  way the job is re-submitted to the cluster?
>> Kind regards
>> Phil



Re: Restore from checkpoint

2024-05-17 Thread jiadong.lu

Hi Phil,

AFAIK, the error indicated your path was incorrect.
your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.


Best.
Jiadong.Lu

On 5/18/24 2:37 AM, Phil Stavridis wrote:

Hi,

I am trying to test how the checkpoints work for restoring state, but not sure 
how to run a new instance of a flink job, after I have cancelled it, using the 
checkpoints which I store in the filesystem of the job manager, e.g. 
/opt/flink/checkpoints.

I have tried passing the checkpoint as an argument in the function and use it 
while setting the checkpoint but it looks like the way it is done is something 
like below:


docker-compose exec jobmanager flink run -s 
:/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py

But I am getting error:
Caused by: java.io.IOException: Checkpoint/savepoint path 
':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. 
Either the pointer path is invalid, or the checkpoint was created by a 
different state backend.

What is wrong with the  way the job is re-submitted to the cluster?

Kind regards
Phil


Re: Restore from Checkpoint from local Standalone Job

2021-03-29 Thread Piotr Nowojski
Hi Sandeep,

I think it should work fine with `StandaloneCompletedCheckpointStore`.

Have you checked if your directory /Users/test/savepoint  is being
populated in the first place? And if so, if the restarted job is not
throwing some exceptions like it can not access those files?

Also note, that cancel with savepoint is deprecated and you should be using
stop with savepoint [1]

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#terminating-a-job

pt., 26 mar 2021 o 18:55 Sandeep khanzode 
napisał(a):

> Hello
>
>
> I was reading this:
> https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question
>
>
> I am trying to run a standalone job on my local with a single job manager
> and task manager.
>
>
>
> I have enabled checkpointing as below:
>
> env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", 
> true));
>
> env.enableCheckpointing(30 * 1000);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
> After I stop my job (I also tried to cancel the job using bin/flink cancel
> -s /Users/test/savepoint ), I tried to start the same job using…
>
> ./standalone-job.sh start-foreground test.jar --job-id 
> --job-classname com.test.MyClass --fromSavepoint /Users/test/savepoint
>
>
> But it never restores the state, and always starts afresh.
>
>
> In Flink, I see this:
>
> StandaloneCompletedCheckpointStore
>
> * {@link CompletedCheckpointStore} for JobManagers running in {@link 
> HighAvailabilityMode#NONE}.
>
> public void recover() throws Exception {
>
> // Nothing to do
> }
>
>
> Does this have something to do with not being able to restore state?
>
> Does this need Zookeeper or K8S HA for functioning?
>
>
> Thanks,
> Sandeep
>
>