Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5584#discussion_r170922371 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -479,36 +479,60 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; - flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); + effectiveConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); + + final ApplicationId appId = yarnApplication.getApplicationSubmissionContext().getApplicationId(); - ApplicationReport report = startAppMaster( - new Configuration(flinkConfiguration), + // --------- Add Zookeeper namespace to effectiveConfiguration --------- + String zkNamespace = getZookeeperNamespace(); + // no user specified cli argument for namespace? + if (zkNamespace == null || zkNamespace.isEmpty()) { + // namespace defined in config? else use applicationId as default. + zkNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, appId.toString()); + setZookeeperNamespace(zkNamespace); --- End diff -- I think we could remove the whole `setZookeeperNamespace` and `getZookeeperNamespace` methods + field and simply rely on the fact that the input `Configuration` has the correct field set if there was a respective option set.
---