Hi,
We have a flink job (flink version 1.6.1) which unions 2 streams to pass
through custom KeyedProcessFunction with RocksDB state store which final
creates another stream into Kafka. Current size of checkpoint is around
~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
running this job on yarn with following parameters

-yn 10  (10 task managers)
-ytm 2048 (2 GB each)
- Operator parallelism is also 10.

While trying to run savepoint on this job, it runs for ~10mins and then
throws following error. Looks like checkpoint default timeout of 10mins is
causing this. What is recommended way to run savepoint for such job? Should
we increase checkpoint default timeout of 10mins? Also currently our state
size is 100GB but it is expected to grow unto 1TB. Is flink good for
usecases with that much of size? Also how much time savepoint is expected
to take with such state size and parallelism on Yarn? Any other
recommendation would be of great help.

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
434398968e635a49329f59a019b41b6f failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
expired before completing
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint expired before completing
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply via email to