Re: Flink k8 HA mode + checkpoint management

2021-08-05 Thread Yang Wang
FLINK-19358[1] might be related and we already have some discussion there.

[1]. https://issues.apache.org/jira/browse/FLINK-19358

Best,
Yang

Yun Tang  于2021年8月4日周三 上午11:50写道:

> Hi Harsh,
>
> The job id would be fixed as  if using HA
> mode with native k8s, which means the checkpoint path should stay the same
> no matter how many times you submit.
> However, if HA mode is enabled, the new job would first recover from the
> HA checkpoint store to recover the last checkpoint. In other words, your
> new job should recover from last checkpoint-1. From your exceptions, we can
> see the job did not recover successfully and start the job from scratch.
> That's why you could meet the exception that checkpoint-meta file has been
> existed.
>
> There would be two reasons for this:
>
>1. The HA checkpoint store did not recover successfully, you could
>check whether the checkpoint 1 is completed in the previous run.
>2. The last checkpoint-1 finished to store on the remote checkpoint
>path but fail to add to the checkpoint store. However, the checkpoint
>coordinator would clean up the checkpoint meta if failed to add to
>checkpoint store [1] unless your job crashed or meet the
>PossibleInconsistentStateException [2].
>
> I think you should check the jobmanager log of your last run to know the
> root cause.
>
> [1]
> https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1233
> [2]
> https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1226
>
>
> Best
> Yun Tang
> --
> *From:* Manong Karl 
> *Sent:* Wednesday, August 4, 2021 9:17
> *To:* Harsh Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink k8 HA mode + checkpoint management
>
> Can You please share your configs? I'm using native kubernetes without HA
> and there's no issues. I'm curious how this happens.  AFAIK jobid is
> generated randomly.
>
>
> Harsh Shah  于2021年8月4日周三 上午2:44写道:
>
> Hello,
>
> I am trying to use Flink HA mode inside kubernetes
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/>
>  in standalone
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#application-mode>
>  mode.
> The Job ID is always constant, "". In
> situation where we restart the job (Not from a check-point or savepoint),
> we see errors like
> """
>
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> '/flink-checkpoints//chk-1/_metadata' 
> already exists
>
> """
> where checkpoints have not been created since the restart of Job .
>
> My question:
> * Is the recommended way to set a new unique "checkpoint path" every time
> we update Job and restart necessary k8 resources (say not restarted from
> checkpoint-savepoint)? Or GC checkpoints during deletion and reload from
> savepoint if required? Looking for a standard recommendation.
> * Is there a way I can override the JobID to be unique and indicate it is
> a complete restart in HA mode?
>
>
> Thanks,
> Harsh
>
>


Re: Flink k8 HA mode + checkpoint management

2021-08-03 Thread Yun Tang
Hi Harsh,

The job id would be fixed as  if using HA mode 
with native k8s, which means the checkpoint path should stay the same no matter 
how many times you submit.
However, if HA mode is enabled, the new job would first recover from the HA 
checkpoint store to recover the last checkpoint. In other words, your new job 
should recover from last checkpoint-1. From your exceptions, we can see the job 
did not recover successfully and start the job from scratch. That's why you 
could meet the exception that checkpoint-meta file has been existed.

There would be two reasons for this:

  1.  The HA checkpoint store did not recover successfully, you could check 
whether the checkpoint 1 is completed in the previous run.
  2.  The last checkpoint-1 finished to store on the remote checkpoint path but 
fail to add to the checkpoint store. However, the checkpoint coordinator would 
clean up the checkpoint meta if failed to add to checkpoint store [1] unless 
your job crashed or meet the PossibleInconsistentStateException [2].

I think you should check the jobmanager log of your last run to know the root 
cause.

[1] 
https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1233
[2] 
https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1226


Best
Yun Tang

From: Manong Karl 
Sent: Wednesday, August 4, 2021 9:17
To: Harsh Shah 
Cc: user@flink.apache.org 
Subject: Re: Flink k8 HA mode + checkpoint management

Can You please share your configs? I'm using native kubernetes without HA and 
there's no issues. I'm curious how this happens.  AFAIK jobid is generated 
randomly.


Harsh Shah mailto:harsh.a.s...@shopify.com>> 
于2021年8月4日周三 上午2:44写道:
Hello,

I am trying to use Flink HA mode inside 
kubernetes<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/>
 in 
standalone<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#application-mode>
 mode. The Job ID is always constant, "". In 
situation where we restart the job (Not from a check-point or savepoint), we 
see errors like
"""

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
'/flink-checkpoints//chk-1/_metadata' 
already exists

"""
where checkpoints have not been created since the restart of Job .

My question:
* Is the recommended way to set a new unique "checkpoint path" every time we 
update Job and restart necessary k8 resources (say not restarted from 
checkpoint-savepoint)? Or GC checkpoints during deletion and reload from 
savepoint if required? Looking for a standard recommendation.
* Is there a way I can override the JobID to be unique and indicate it is a 
complete restart in HA mode?


Thanks,
Harsh


Re: Flink k8 HA mode + checkpoint management

2021-08-03 Thread Manong Karl
Can You please share your configs? I'm using native kubernetes without HA
and there's no issues. I'm curious how this happens.  AFAIK jobid is
generated randomly.


Harsh Shah  于2021年8月4日周三 上午2:44写道:

> Hello,
>
> I am trying to use Flink HA mode inside kubernetes
> 
>  in standalone
> 
>  mode.
> The Job ID is always constant, "". In
> situation where we restart the job (Not from a check-point or savepoint),
> we see errors like
> """
>
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> '/flink-checkpoints//chk-1/_metadata' 
> already exists
>
> """
> where checkpoints have not been created since the restart of Job .
>
> My question:
> * Is the recommended way to set a new unique "checkpoint path" every time
> we update Job and restart necessary k8 resources (say not restarted from
> checkpoint-savepoint)? Or GC checkpoints during deletion and reload from
> savepoint if required? Looking for a standard recommendation.
> * Is there a way I can override the JobID to be unique and indicate it is
> a complete restart in HA mode?
>
>
> Thanks,
> Harsh
>


Flink k8 HA mode + checkpoint management

2021-08-03 Thread Harsh Shah
Hello,

I am trying to use Flink HA mode inside kubernetes

 in standalone

mode.
The Job ID is always constant, "". In
situation where we restart the job (Not from a check-point or savepoint),
we see errors like
"""

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException:
'/flink-checkpoints//chk-1/_metadata'
already exists

"""
where checkpoints have not been created since the restart of Job .

My question:
* Is the recommended way to set a new unique "checkpoint path" every time
we update Job and restart necessary k8 resources (say not restarted from
checkpoint-savepoint)? Or GC checkpoints during deletion and reload from
savepoint if required? Looking for a standard recommendation.
* Is there a way I can override the JobID to be unique and indicate it is a
complete restart in HA mode?


Thanks,
Harsh