Great, thanks for sharing that info.

Gagan

On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <myas...@live.com> wrote:

> Haha, actually externalized checkpoint also support parallelism changes,
> you could read my email
> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html>
> posted in dev-mail-list.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Gagan Agrawal <agrawalga...@gmail.com>
> *Sent:* Thursday, November 1, 2018 13:38
> *To:* myas...@live.com
> *Cc:* happydexu...@gmail.com; user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
> able to save save points now. In our case we wanted to increase parallelism
> so I believe savepoint is the only option as checkpoint doesn't support
> code/parallelism changes.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <myas...@live.com> wrote:
>
> Hi Gagan
>
> Savepoint would generally takes more time than usual incremental
> checkpoint, you could try to increase checkpoint timeout time [1]
>
>    env.getCheckpointConfig().setCheckpointTimeout(900000);
>
> If you just want to resume from previous job without change the 
> state-backend, I think you could also try to resume from a retained 
> checkpoint without trigger savepoint [2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Apache Flink 1.6 Documentation: Checkpoints
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
> Deployment & Operations; State & Fault Tolerance; Checkpoints;
> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
> Difference to Savepoints; Resuming from a retained checkpoint
> ci.apache.org
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Gagan Agrawal <agrawalga...@gmail.com>
> *Sent:* Wednesday, October 31, 2018 19:03
> *To:* happydexu...@gmail.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Hi Henry,
> Thanks for your response. However we don't face this issue during normal
> run as we have incremental checkpoints. Only when we try to take savepoint
> (which tries to save entire state in one go), we face this problem.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <happydexu...@gmail.com> wrote:
>
> Hi Gagan,
>         I have met with the error the checkpoint timeout too.
>         In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
>         Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal <agrawalga...@gmail.com> 写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >       at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >       at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> >       at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> >       at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at javax.security.auth.Subject.doAs(Subject.java:422)
> >       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >       at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >       at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> > Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> expired before completing
> >       at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> >       at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >       at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >       at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> >       at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> >       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.util.concurrent.CompletionException:
> java.lang.Exception: Checkpoint expired before completing
> >       at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >       at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >       at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> >       at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
>

Reply via email to