Hi,
I'm getting an error that happens randomly when starting a flink
application.

For context, this is running in YARN on AWS. This application is one that
converts from the Table API to the Stream API, so two flink
applications/jobmanagers are trying to start up. I think what happens is
that the rest api port is chosen, and is the same for both of the flink
apps. If YARN chooses two different instances for the two task managers,
they each work fine and start their rest api on the same port on their own
respective machine. But, if YARN chooses the same instance for both job
managers, they both try to start up the rest api on the same port on the
same machine, and I get the error.

Here is the error:

2021-09-22 15:47:27,724 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
to initialize the cluster entrypoint YarnJobClusterEntrypoint.
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the
DispatcherResourceManagerComponent.
        at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_282]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
~[hadoop-common-3.2.1-amzn-3.jar:?]
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        ... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on
any port in port range 35485
        at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_282]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
~[hadoop-common-3.2.1-amzn-3.jar:?]
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
        ... 2 more


And, here is part of the log from the other job manager, which
successfully started its rest api on the same port, just a few seconds
earlier:


2021-09-22 15:47:20,690 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
Rest endpoint listening at ip-10-1-2-137.ec2.internal:35485
2021-09-22 15:47:20,691 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
http://ip-10-1-2-137.ec2.internal:35485 was granted leadership with
leaderSessionID=00000000-0000-0000-0000-000000000000
2021-09-22 15:47:20,692 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Web
frontend listening at http://ip-10-1-2-137.ec2.internal:35485.



Do you know of any configuration that would assist with this? I
thought about rest.bind-port, but the rest port already seems to be
chosen dynamically. My config file has that setting commented out.


Thanks

Reply via email to