Ethanlm commented on a change in pull request #3366: URL: https://github.com/apache/storm/pull/3366#discussion_r667217467
########## File path: bin/docker-to-squash.py ########## @@ -0,0 +1,1430 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +docker_to_squash.py is a tool to facilitate the process of converting +Docker images into squashFS layers, manifests, and configs. + +Tool dependencies: skopeo, squashfs-tools, tar, setfattr +""" + +import argparse +from collections import Iterable +import glob +import hashlib +import json +import logging +import os +import re +import shutil +import subprocess +from threading import Timer + +LOG_LEVEL = None +HADOOP_BIN_DIR = None + +def shell_command(command, print_stdout, print_stderr, raise_on_error, + timeout_sec=600): + global LOG_LEVEL + stdout_val = subprocess.PIPE + stderr_val = subprocess.PIPE + + logging.debug("command: %s", command) + + if print_stdout: + stdout_val = None + + if print_stderr or LOG_LEVEL == "DEBUG": + stderr_val = None + + process = None + try: + process = subprocess.Popen(command, stdout=stdout_val, + stderr=stderr_val) + timer = Timer(timeout_sec, process_timeout, [process]) + + timer.start() + out, err = process.communicate() + + if raise_on_error and process.returncode is not 0: + exception_string = ("Commmand: " + str(command) + + " failed with returncode: " + + str(process.returncode)) + if out != None: + exception_string = exception_string + "\nstdout: " + str(out) + if err != None: + exception_string = exception_string + "\nstderr: " + str(err) + raise Exception(exception_string) + + except: + if process and process.poll() is None: + process.kill() + raise Exception("Popen failure") + finally: + if timer: + timer.cancel() + + return out, err, process.returncode + +def process_timeout(process): + process.kill() + logging.error("Process killed due to timeout") + +def does_hdfs_entry_exist(entry, raise_on_error=True): + out, err, returncode = hdfs_ls(entry, raise_on_error=raise_on_error) + if returncode is not 0: + return False + return True + +def setup_hdfs_dirs(dirs): + if does_hdfs_entry_exist(dirs, raise_on_error=False): + return + + hdfs_mkdir(dirs, create_parents=True) + chmod_dirs = [] + for dir_entry in dirs: + directories = dir_entry.split("/")[1:] + dir_path = "" + for directory in directories: + dir_path = dir_path + "/" + directory + logging.info("dir_path: %s", str(dir_path)) + chmod_dirs.append(dir_path) + hdfs_chmod("755", chmod_dirs) + +def append_or_extend_to_list(src, src_list): + if isinstance(src, list): + src_list.extend(src) + else: + src_list.append(src) + +def hdfs_get(src, dest, print_stdout=False, print_stderr=False, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-get"] + append_or_extend_to_list(src, command) + command.append(dest) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False, + raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-ls"] + if options: + append_or_extend_to_list(options, command) + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, + raise_on_error) + return out, err, returncode + +def hdfs_cat(file_path, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cat"] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True, + create_parents=False): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-mkdir"] + if create_parents: + command.append("-p") + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_rm(file_path, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-rm"] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_put(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-put"] + if force: + command.append("-f") + append_or_extend_to_list(src, command) + command.append(dest) + out, err, returncode = shell_command(command, print_stdout, print_stderr, + raise_on_error, 60) + return out, err, returncode + +def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True, + recursive=False): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-chmod"] + if recursive: + command.append("-R") + command.append(mode) + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_setrep(replication, file_path, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_cp(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"] + if force: + command.append("-f") + append_or_extend_to_list(src, command) + command.append(dest) + out, err, returncode = shell_command(command, print_stdout, print_stderr, + raise_on_error, 60) + return out, err, returncode + +def hdfs_touchz(file_path, print_stdout=False, print_stderr=True, + raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-touchz"] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + + +def get_working_dir(directory): + try: + if os.path.isdir(directory): + working_dir = os.path.join(directory, "docker-to-squash") + else: + working_dir = directory + os.makedirs(working_dir) + except: + raise Exception("Could not create working_dir: " + working_dir) + return working_dir + +def is_sha256_hash(string): + if not re.findall(r"^[a-fA-F\d]{64,64}$", string): + return False + return True + +def calculate_file_hash(filename): + sha = hashlib.sha256() + with open(filename, 'rb') as file_pointer: + while True: + data = file_pointer.read(65536) + if not data: + break + sha.update(data) + hexdigest = sha.hexdigest() + if hexdigest == 0: + raise Exception("Hex digest for file: " + hexdigest + "returned 0") + return hexdigest + +def calculate_string_hash(string): + sha = hashlib.sha256() + sha.update(string) + return sha.hexdigest() + +def get_local_manifest_from_path(manifest_path): + with open(manifest_path, "rb") as file_pointer: + out = file_pointer.read() + manifest_hash = calculate_string_hash(str(out)) + manifest = json.loads(out) + return manifest, manifest_hash + +def get_hdfs_manifest_from_path(manifest_path): + out, err, returncode = hdfs_cat(manifest_path) + manifest_hash = calculate_string_hash(str(out)) + manifest = json.loads(out) + return manifest, manifest_hash + +def get_config_hash_from_manifest(manifest): + config_hash = manifest['config']['digest'].split(":", 1)[1] + return config_hash + +def check_total_layer_number(layers): + global MAX_IMAGE_LAYERS + if len(layers) > MAX_IMAGE_LAYERS: + logging.error("layers: " + str(layers)) + raise Exception("Image has " + str(len(layers)) + + " layers, which is more than the maximum " + str(MAX_IMAGE_LAYERS) + + " layers. Failing out") + +def check_total_layer_size(manifest, size): + global MAX_IMAGE_SIZE + if size > MAX_IMAGE_SIZE: + for layer in manifest['layers']: + logging.error("layer " + layer['digest'] + " has size " + str(layer['size'])) + raise Exception("Image has total size " + str(size) + + " B. which is more than the maximum size " + str(MAX_IMAGE_SIZE) + " B. Failing out") + +def get_layer_hashes_from_manifest(manifest, error_on_size_check=True): + layers = [] + size = 0; + + for layer in manifest['layers']: + layers.append(layer['digest'].split(":", 1)[1]) + size += layer['size'] + + if error_on_size_check: + check_total_layer_number(layers) + check_total_layer_size(manifest, size) + + return layers + +def get_pull_fmt_string(pull_format): + pull_fmt_string = pull_format + ":" + if pull_format == "docker": + pull_fmt_string = pull_fmt_string + "//" + return pull_fmt_string + +def get_manifest_from_docker_image(pull_format, image): + pull_fmt_string = get_pull_fmt_string(pull_format) + out, err, returncode = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image], + False, True, True, 60) + manifest = json.loads(out) + if 'manifests' in manifest: + logging.debug("skopeo inspect --raw returned a list of manifests") + manifests_dict = manifest['manifests'] + sha = None + for mfest in manifests_dict: + if(mfest['platform']['architecture'] == "amd64"): + sha = mfest['digest'] + break + if not sha: + raise Exception("Could not find amd64 manifest for" + image) + + image_without_tag = image.split("/", 1)[-1].split(":", 1)[0] + image_and_sha = image_without_tag + "@" + sha + + logging.debug("amd64 manifest sha is: %s", sha) + + manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image_and_sha) + else: + manifest_hash = calculate_string_hash(str(out)) + + logging.debug("manifest: %s", str(manifest)) + return manifest, manifest_hash + +def split_image_and_tag(image_and_tag): + split = image_and_tag.split(",") + image = split[0] + tags = split[1:] + return image, tags + +def read_image_tag_to_hash(image_tag_to_hash): + hash_to_tags = dict() + tag_to_hash = dict() + with open(image_tag_to_hash, 'rb') as file_pointer: + while True: + line = file_pointer.readline() + if not line: + break + line = line.rstrip() + + if not line: + continue + + comment_split_line = line.split("#", 1) + line = comment_split_line[0] + comment = comment_split_line[1:] + + split_line = line.rsplit(":", 1) + manifest_hash = split_line[-1] + tags_list = ' '.join(split_line[:-1]).split(",") + + if not is_sha256_hash(manifest_hash) or not tags_list: + logging.warn("image-tag-to-hash file malformed. Skipping entry %s", line) + continue + + tags_and_comments = hash_to_tags.get(manifest_hash, None) + if tags_and_comments is None: + known_tags = tags_list + known_comment = comment + else: + known_tags = tags_and_comments[0] + for tag in tags_list: + if tag not in known_tags: + known_tags.append(tag) + known_comment = tags_and_comments[1] + known_comment.extend(comment) + + hash_to_tags[manifest_hash] = (known_tags, known_comment) + + for tag in tags_list: + cur_manifest = tag_to_hash.get(tag, None) + if cur_manifest is not None: + logging.warn("tag_to_hash already has manifest %s defined for tag %s." + + "This entry will be overwritten", cur_manifest, tag) + tag_to_hash[tag] = manifest_hash + return hash_to_tags, tag_to_hash + +def remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag): + if not hash_to_tags: + logging.debug("hash_to_tags is null. Not removing tag %s", tag) + return + + prev_hash = tag_to_hash.get(tag, None) + + if prev_hash is not None: + del tag_to_hash[tag] + prev_tags, prev_comment = hash_to_tags.get(prev_hash, (None, None)) + prev_tags.remove(tag) + if prev_tags == 0: + del hash_to_tags[prev_hash] + else: + hash_to_tags[prev_hash] = (prev_tags, prev_comment) + else: + logging.debug("Tag not found. Not removing tag: %s", tag) + +def remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash): + if not hash_to_tags: + logging.debug("hash_to_tags is null. Not removing image_hash %s", image_hash) + return + logging.debug("hash_to_tags: %s", str(hash_to_tags)) + logging.debug("Removing image_hash from dicts: %s", image_hash) + prev_tags, prev_comments = hash_to_tags.get(image_hash, None) + + if prev_tags is not None: + hash_to_tags.pop(image_hash) + for tag in prev_tags: + del tag_to_hash[tag] + +def add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment): + tag_to_hash[tag] = manifest_hash + new_tags_and_comments = hash_to_tags.get(manifest_hash, None) + if new_tags_and_comments is None: + new_tags = [tag] + new_comment = [comment] + else: + new_tags = new_tags_and_comments[0] + new_comment = new_tags_and_comments[1] + if tag not in new_tags: + new_tags.append(tag) + if comment and comment not in new_comment: + new_comment.append(comment) + hash_to_tags[manifest_hash] = (new_tags, new_comment) + +def write_local_image_tag_to_hash(image_tag_to_hash, hash_to_tags): + file_contents = [] + for key, value in hash_to_tags.iteritems(): + manifest_hash = key + tags = ','.join(map(str, value[0])) + if tags: + comment = ', '.join(map(str, value[1])) + if comment > 0: + comment = "#" + comment + file_contents.append(tags + ":" + manifest_hash + comment + "\n") + + file_contents.sort() + with open(image_tag_to_hash, 'w') as file_pointer: + for val in file_contents: + file_pointer.write(val) + +def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, + manifest_hash, comment): + for tag in tags: + update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment) + +def update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment): + remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag) + add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment) + +def remove_from_dicts(hash_to_tags, tag_to_hash, tags): + for tag in tags: + logging.debug("removing tag: %s", tag) + remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag) + +def populate_tag_dicts(hdfs_root, image_tag_to_hash, local_image_tag_to_hash): + + if does_hdfs_entry_exist(hdfs_root + "/" + image_tag_to_hash): + hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash) + image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash) + else: + image_tag_to_hash_hash = 0 + + if image_tag_to_hash_hash != 0: + hash_to_tags, tag_to_hash = read_image_tag_to_hash(local_image_tag_to_hash) + else: + hash_to_tags = {} + tag_to_hash = {} + return hash_to_tags, tag_to_hash, image_tag_to_hash_hash + + +def setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path): + logging.debug("Setting up squashfs hdfs_dirs: %s", str(hdfs_dirs)) + setup_hdfs_dirs(hdfs_dirs) + if not does_hdfs_entry_exist(image_tag_to_hash_path, raise_on_error=False): + hdfs_touchz(image_tag_to_hash_path) + hdfs_chmod("755", image_tag_to_hash_path) + +def skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir): + logging.info("Pulling image: %s", image) + if os.path.isdir(skopeo_dir): + raise Exception("Skopeo output directory already exists. " + + "Please delete and try again " + + "Directory: " + skopeo_dir) + pull_fmt_string = get_pull_fmt_string(pull_format) + shell_command(["skopeo", "copy", pull_fmt_string + image, + skopeo_format + ":" + skopeo_dir], False, True, True, 600) + +def untar_layer(tmp_dir, layer_path): + shell_command(["tar", "-C", tmp_dir, "--xattrs", + "--xattrs-include='*'", "-xf", layer_path], + False, True, True, 600) + +def tar_file_search(archive, target): + out, err, returncode = shell_command(["tar", "-xf", archive, target, "-O"], + False, False, False, 600) + return out + +def set_fattr(directory): + shell_command(["setfattr", "-n", "trusted.overlay.opaque", + "-v", "y", directory], False, True, True) + +def make_whiteout_block_device(file_path, whiteout): + shell_command(["mknod", "-m", "000", file_path, + "c", "0", "0"], False, True, True) + + out, err, returncode = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True) + perms = str(out).strip() + + shell_command(["chown", perms, file_path], False, True, True) + +def convert_oci_whiteouts(tmp_dir): + out, err, returncode = shell_command(["find", tmp_dir, "-name", ".wh.*"], + False, False, True, 60) + whiteouts = str(out).splitlines() + for whiteout in whiteouts: + if whiteout == 0: + continue + basename = os.path.basename(whiteout) + directory = os.path.dirname(whiteout) + if basename == ".wh..wh..opq": + set_fattr(directory) + else: + whiteout_string = ".wh." + idx = basename.rfind(whiteout_string) + bname = basename[idx+len(whiteout_string):] + file_path = os.path.join(directory, bname) + make_whiteout_block_device(file_path, whiteout) + shell_command(["rm", whiteout], False, True, True) + +def dir_to_squashfs(tmp_dir, squash_path): + shell_command(["/usr/sbin/mksquashfs", tmp_dir, squash_path, "-write-queue", "4096", + "-read-queue", "4096", "-fragment-queue", "4096"], + False, True, True, 600) + +def upload_to_hdfs(file_path, file_name, hdfs_dir, replication, mode, force=False): + dest = hdfs_dir + "/" + file_name + + if does_hdfs_entry_exist(dest, raise_on_error=False): + if not force: + logging.warn("Not uploading to HDFS. File already exists: %s", dest) + return + logging.info("File already exists, but overwriting due to force option: %s", dest) + + hdfs_put(file_path, dest, force) + hdfs_setrep(replication, dest) + hdfs_chmod(mode, dest) + logging.info("Uploaded file %s with replication %d and permissions %s", + dest, replication, mode) + +def atomic_upload_mv_to_hdfs(file_path, file_name, hdfs_dir, replication, image_tag_to_hash_file_hash): + global HADOOP_PREFIX + local_hash = calculate_file_hash(file_path) + if local_hash == image_tag_to_hash_file_hash: + logging.info("image_tag_to_hash file unchanged. Not uploading") + return + + tmp_file_name = file_name + ".tmp" + hdfs_tmp_path = hdfs_dir + "/" + tmp_file_name + hdfs_file_path = hdfs_dir + "/" + file_name + try: + if does_hdfs_entry_exist(hdfs_tmp_path, raise_on_error=False): + hdfs_rm(hdfs_tmp_path) + hdfs_put(file_path, hdfs_tmp_path) + hdfs_setrep(replication, hdfs_tmp_path) + hdfs_chmod("444", hdfs_tmp_path) + + jar_path = HADOOP_PREFIX + "/share/hadoop/tools/lib/hadoop-extras-*.jar" Review comment: yes. HADOOP_PREFIX is where hadoop is installed. The rest is standard. It is looking for the hadoop-extras jar, e.g.: ``` share/hadoop/tools/lib/hadoop-extras-2.10.1.19.2106161351.jar ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@storm.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org