Repository: ambari Updated Branches: refs/heads/branch-2.4 7116c0132 -> 494795607
AMBARI-18862. KAFKA broker start failed during restart stale config services after updating log directory.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49479560 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49479560 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49479560 Branch: refs/heads/branch-2.4 Commit: 494795607c52a373db68099b472e5ede707d1794 Parents: 7116c01 Author: Vitaly Brodetskyi <vbrodets...@hortonworks.com> Authored: Fri Nov 11 06:48:43 2016 +0200 Committer: Vitaly Brodetskyi <vbrodets...@hortonworks.com> Committed: Fri Nov 11 06:48:43 2016 +0200 ---------------------------------------------------------------------- .../KAFKA/0.8.1/package/scripts/kafka.py | 26 ++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/49479560/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py index ac7b0ae..6b4579c 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py @@ -65,15 +65,15 @@ def kafka(upgrade_type=None): kafka_server_config['listeners'] = listeners.replace("6667", port) Logger.info(format("Kafka listeners after the port update: {listeners}")) del kafka_server_config['port'] - - + + if effective_version is not None and effective_version != "" and \ check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version): if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts: brokerid = str(sorted(params.kafka_hosts).index(params.hostname)) kafka_server_config['broker.id'] = brokerid Logger.info(format("Calculating broker.id as {brokerid}")) - + # listeners and advertised.listeners are only added in 2.3.0.0 onwards. if effective_version is not None and effective_version != "" and \ check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version): @@ -215,9 +215,15 @@ def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir): if backup_folder_path: # Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path; for file in os.listdir(backup_folder_path): - File(os.path.join(kafka_managed_dir,file), - owner=params.kafka_user, - content = StaticFile(os.path.join(backup_folder_path,file))) + if os.path.isdir(os.path.join(backup_folder_path, file)): + Execute(('cp', '-r', os.path.join(backup_folder_path, file), kafka_managed_dir), + sudo=True) + Execute(("chown", "-R", format("{kafka_user}:{user_group}"), os.path.join(kafka_managed_dir, file)), + sudo=True) + else: + File(os.path.join(kafka_managed_dir,file), + owner=params.kafka_user, + content = StaticFile(os.path.join(backup_folder_path,file))) # Clean up backed up folder Directory(backup_folder_path, @@ -239,7 +245,13 @@ def backup_dir_contents(dir_path, backup_folder_suffix): ) # Safely copy top-level contents to backup folder for file in os.listdir(dir_path): - File(os.path.join(backup_destination_path, file), + if os.path.isdir(os.path.join(dir_path, file)): + Execute(('cp', '-r', os.path.join(dir_path, file), backup_destination_path), + sudo=True) + Execute(("chown", "-R", format("{kafka_user}:{user_group}"), os.path.join(backup_destination_path, file)), + sudo=True) + else: + File(os.path.join(backup_destination_path, file), owner=params.kafka_user, content = StaticFile(os.path.join(dir_path,file)))