RE: RE: checkpointing seems to be throttled.

2020-12-24 Thread Colletta, Edward
FYI, this was an EFS issue.  I originally dismissed EFS being the issue because 
the Percent I/O limit metric  was very low.  But I later noticed the throughput 
utilization was very high.  We increased the provisioned throughput and the 
checkpoint times are greatly reduced.

From: Colletta, Edward
Sent: Monday, December 21, 2020 12:32 PM
To: Yun Gao ; user@flink.apache.org
Subject: RE: RE: checkpointing seems to be throttled.

Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.or

RE: RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao 
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org<mailto:user@flink

Re: RE: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said 

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao , user@flink.apache.org 

Subject:RE: checkpointing seems to be throttled.

Thanks for the quick response.
We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any. 
From: Yun Gao  
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.
This email is from an external source -exercise caution regarding links and 
attachments.
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch