Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5584#discussion_r170921732 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -479,36 +479,60 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; - flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); + effectiveConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); + + final ApplicationId appId = yarnApplication.getApplicationSubmissionContext().getApplicationId(); - ApplicationReport report = startAppMaster( - new Configuration(flinkConfiguration), + // --------- Add Zookeeper namespace to effectiveConfiguration --------- + String zkNamespace = getZookeeperNamespace(); + // no user specified cli argument for namespace? + if (zkNamespace == null || zkNamespace.isEmpty()) { + // namespace defined in config? else use applicationId as default. + zkNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, appId.toString()); + setZookeeperNamespace(zkNamespace); + } + effectiveConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); + + effectiveConfiguration.setInteger( + ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + clusterSpecification.getSlotsPerTaskManager()); + + // write out configuration file so that it can be uploaded + File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); + tmpConfigurationFile.deleteOnExit(); + BootstrapTools.writeConfiguration(effectiveConfiguration, tmpConfigurationFile); --- End diff -- Why do we pull the writing of the configuration out of `startAppMaster`? This belongs somewhat to the `startAppMaster` method, because then it could also be cleaned up there after starting the application master.
---