wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] 
Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409308841
 
 

 ##########
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, 
List<HasMetadata> resource
        }
 
        /**
-        * To get nodePort of configured ports.
+        * Get rest port from the external Service.
         */
-       private int getServiceNodePort(Service service, ConfigOption<Integer> 
configPort) {
-               final int port = this.flinkConfig.getInteger(configPort);
-               if (service.getSpec() != null && service.getSpec().getPorts() 
!= null) {
-                       for (ServicePort p : service.getSpec().getPorts()) {
-                               if (p.getPort() == port) {
-                                       return p.getNodePort();
-                               }
-                       }
+       private int getRestPortFromExternalService(Service externalService) {
+               final List<ServicePort> servicePortCandidates = 
externalService.getSpec().getPorts()
+                       .stream()
+                       .filter(x -> 
x.getName().equals(Constants.REST_PORT_NAME))
+                       .collect(Collectors.toList());
+
+               if (servicePortCandidates.isEmpty()) {
+                       throw new RuntimeException("Failed to find port \"" + 
Constants.REST_PORT_NAME + "\" in Service \"" +
+                               
KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+               }
+
+               final ServicePort externalServicePort = 
servicePortCandidates.get(0);
+
+               final KubernetesConfigOptions.ServiceExposedType 
externalServiceType =
+                       
KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+               switch (externalServiceType) {
+                       case ClusterIP:
+                       case LoadBalancer:
 
 Review comment:
   I have no idea if you really want to completely separate the `LoadBalancer` 
and `NodePort`. However, in such case, we should wait for the load balancer 
ready and return the `EXTERNAL-IP:8081` as JobManager url. Otherwise, the Flink 
client will timeout and should clean-up the cluster resources.
   
   Then for unmanaged K8s cluster without load balancer, we will enforce the 
users to set `kubernetes.rest-service.exposed.type=NodePort` explicitly, rather 
than return a confusing JobManager url `MASTER_ADDRESS:8081`. Also the 
submission will always fail with timeout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to