Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5584#discussion_r170922371
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
    @@ -479,36 +479,60 @@ public void terminateCluster(ApplicationId 
applicationId) throws FlinkException
                        ClusterEntrypoint.ExecutionMode.DETACHED
                        : ClusterEntrypoint.ExecutionMode.NORMAL;
     
    -           flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
    +           
effectiveConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
    +
    +           final ApplicationId appId = 
yarnApplication.getApplicationSubmissionContext().getApplicationId();
     
    -           ApplicationReport report = startAppMaster(
    -                   new Configuration(flinkConfiguration),
    +           // --------- Add Zookeeper namespace to effectiveConfiguration 
---------
    +           String zkNamespace = getZookeeperNamespace();
    +           // no user specified cli argument for namespace?
    +           if (zkNamespace == null || zkNamespace.isEmpty()) {
    +                   // namespace defined in config? else use applicationId 
as default.
    +                   zkNamespace = 
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, 
appId.toString());
    +                   setZookeeperNamespace(zkNamespace);
    --- End diff --
    
    I think we could remove the whole `setZookeeperNamespace` and 
`getZookeeperNamespace` methods + field and simply rely on the fact that the 
input `Configuration` has the correct field set if there was a respective 
option set.


---

Reply via email to