RE: RE: checkpointing seems to be throttled.
FYI, this was an EFS issue. I originally dismissed EFS being the issue because the Percent I/O limit metric was very low. But I later noticed the throughput utilization was very high. We increased the provisioned throughput and the checkpoint times are greatly reduced. From: Colletta, Edward Sent: Monday, December 21, 2020 12:32 PM To: Yun Gao ; user@flink.apache.org Subject: RE: RE: checkpointing seems to be throttled. Doh! Yeah, we set the state backend in code and I read the flink-conf.yaml file and use the high-availability storage dir. From: Yun Gao mailto:yungao...@aliyun.com>> Sent: Monday, December 21, 2020 11:28 AM To: Colletta, Edward mailto:edward.colle...@fmr.com>>; user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: RE: checkpointing seems to be throttled. This email is from an external source - exercise caution regarding links and attachments. Hi Edward, Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41) at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237) at org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67) at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089) at java.util.Optional.map(Optional.java:215) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070) at CheckpointTest.main(CheckpointTest.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ... 11 more For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time. Best, Yun --Original Mail -- Sender:Colletta, Edward mailto:edward.colle...@fmr.com>> Send Date:Tue Dec 22 00:04:03 2020 Recipients:Yun Gao mailto:yungao...@aliyun.com>>, user@flink.apache.or
RE: RE: checkpointing seems to be throttled.
Doh! Yeah, we set the state backend in code and I read the flink-conf.yaml file and use the high-availability storage dir. From: Yun Gao Sent: Monday, December 21, 2020 11:28 AM To: Colletta, Edward ; user@flink.apache.org Subject: Re: RE: checkpointing seems to be throttled. This email is from an external source - exercise caution regarding links and attachments. Hi Edward, Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41) at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237) at org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67) at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089) at java.util.Optional.map(Optional.java:215) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070) at CheckpointTest.main(CheckpointTest.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ... 11 more For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time. Best, Yun --Original Mail -- Sender:Colletta, Edward mailto:edward.colle...@fmr.com>> Send Date:Tue Dec 22 00:04:03 2020 Recipients:Yun Gao mailto:yungao...@aliyun.com>>, user@flink.apache.org<mailto:user@flink.apache.org> mailto:user@flink.apache.org>> Subject:RE: checkpointing seems to be throttled. Thanks for the quick response. We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory. We do monitor backpressure through rest api periodically and we do not see any. From: Yun Gao mailto:yungao...@aliyun.com>> Sent: Monday, December 21, 2020 10:40 AM To: Colletta, Edward mailto:edward.colle...@fmr.com>>; user@flink.apache.org<mailto:user@flink
Re: RE: checkpointing seems to be throttled.
Hi Edward, Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41) at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237) at org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67) at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089) at java.util.Optional.map(Optional.java:215) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070) at CheckpointTest.main(CheckpointTest.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ... 11 more For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time. Best, Yun --Original Mail -- Sender:Colletta, Edward Send Date:Tue Dec 22 00:04:03 2020 Recipients:Yun Gao , user@flink.apache.org Subject:RE: checkpointing seems to be throttled. Thanks for the quick response. We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory. We do monitor backpressure through rest api periodically and we do not see any. From: Yun Gao Sent: Monday, December 21, 2020 10:40 AM To: Colletta, Edward ; user@flink.apache.org Subject: Re: checkpointing seems to be throttled. This email is from an external source -exercise caution regarding links and attachments. Hi Edward, For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch