The layout submodule of the qcow2 package creates a random valid image, randomly selects some amount of its fields, fuzzes them and write the fuzzed image to the file. Fuzzing process can be controlled by an external configuration.
Reviewed-by: Stefan Hajnoczi <stefa...@redhat.com> Signed-off-by: Maria Kustova <mari...@catit.be> --- tests/image-fuzzer/qcow2/layout.py | 369 +++++++++++++++++++++++++++++++++++++ 1 file changed, 369 insertions(+) create mode 100644 tests/image-fuzzer/qcow2/layout.py diff --git a/tests/image-fuzzer/qcow2/layout.py b/tests/image-fuzzer/qcow2/layout.py new file mode 100644 index 0000000..4c08202 --- /dev/null +++ b/tests/image-fuzzer/qcow2/layout.py @@ -0,0 +1,369 @@ +# Generator of fuzzed qcow2 images +# +# Copyright (C) 2014 Maria Kustova <mari...@catit.be> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import random +import struct +import fuzz + +MAX_IMAGE_SIZE = 10 * (1 << 20) +# Standard sizes +UINT32_S = 4 +UINT64_S = 8 + + +class Field(object): + + """Atomic image element (field). + + The class represents an image field as quadruple of a data format + of value necessary for its packing to binary form, an offset from + the beginning of the image, a value and a name. + + The field can be iterated as a list [format, offset, value]. + """ + + __slots__ = ('fmt', 'offset', 'value', 'name') + + def __init__(self, fmt, offset, val, name): + self.fmt = fmt + self.offset = offset + self.value = val + self.name = name + + def __iter__(self): + return iter([self.fmt, self.offset, self.value]) + + def __repr__(self): + return "Field(fmt='%s', offset=%d, value=%s, name=%s)" % \ + (self.fmt, self.offset, str(self.value), self.name) + + +class FieldsList(object): + + """List of fields. + + The class allows access to a field in the list by its name and joins + several list in one via in-place addition. + """ + + def __init__(self, meta_data=None): + if meta_data is None: + self.data = [] + else: + self.data = [Field(f[0], f[1], f[2], f[3]) + for f in meta_data] + + def __getitem__(self, name): + return [x for x in self.data if x.name == name] + + def __iter__(self): + return iter(self.data) + + def __iadd__(self, other): + self.data += other.data + return self + + def __len__(self): + return len(self.data) + + +class Image(object): + + """ Qcow2 image object. + + This class allows to create qcow2 images with random valid structures and + values, fuzz them via external qcow2.fuzz module and write the result to + a file. + """ + + @staticmethod + def _size_params(): + """Generate a random image size aligned to a random correct + cluster size. + """ + cluster_bits = random.randrange(9, 21) + cluster_size = 1 << cluster_bits + img_size = random.randrange(0, MAX_IMAGE_SIZE + 1, cluster_size) + return (cluster_bits, img_size) + + @staticmethod + def _header(cluster_bits, img_size, backing_file_name=None): + """Generate a random valid header.""" + meta_header = [ + ['>4s', 0, "QFI\xfb", 'magic'], + ['>I', 4, random.randint(2, 3), 'version'], + ['>Q', 8, 0, 'backing_file_offset'], + ['>I', 16, 0, 'backing_file_size'], + ['>I', 20, cluster_bits, 'cluster_bits'], + ['>Q', 24, img_size, 'size'], + ['>I', 32, 0, 'crypt_method'], + ['>I', 36, 0, 'l1_size'], + ['>Q', 40, 0, 'l1_table_offset'], + ['>Q', 48, 0, 'refcount_table_offset'], + ['>I', 56, 0, 'refcount_table_clusters'], + ['>I', 60, 0, 'nb_snapshots'], + ['>Q', 64, 0, 'snapshots_offset'], + ['>Q', 72, 0, 'incompatible_features'], + ['>Q', 80, 0, 'compatible_features'], + ['>Q', 88, 0, 'autoclear_features'], + # Only refcount_order = 4 is supported by current (07.2014) + # implementation of QEMU + ['>I', 96, 4, 'refcount_order'], + ['>I', 100, 0, 'header_length'] + ] + v_header = FieldsList(meta_header) + + if v_header['version'][0].value == 2: + v_header['header_length'][0].value = 72 + else: + v_header['incompatible_features'][0].value = random.getrandbits(2) + v_header['compatible_features'][0].value = random.getrandbits(1) + v_header['header_length'][0].value = 104 + + max_header_len = struct.calcsize(v_header['header_length'][0].fmt) + \ + v_header['header_length'][0].offset + end_of_extension_area_len = 2 * UINT32_S + free_space = (1 << cluster_bits) - (max_header_len + + end_of_extension_area_len) + # If the backing file name specified and there is enough space for it + # in the first cluster, then it's placed in the very end of the first + # cluster. + if (backing_file_name is not None) and \ + (free_space >= len(backing_file_name)): + v_header['backing_file_size'][0].value = len(backing_file_name) + v_header['backing_file_offset'][0].value = (1 << cluster_bits) - \ + len(backing_file_name) + + return v_header + + @staticmethod + def _backing_file_name(header, backing_file_name=None): + """Add the name of the backing file at the offset specified + in the header. + """ + if (backing_file_name is not None) and \ + (not header['backing_file_offset'][0].value == 0): + data_len = len(backing_file_name) + data_fmt = '>' + str(data_len) + 's' + data_field = FieldsList([ + [data_fmt, header['backing_file_offset'][0].value, + backing_file_name, 'bf_name'] + ]) + else: + data_field = FieldsList() + + return data_field + + @staticmethod + def _backing_file_format(header, backing_file_fmt=None): + """Generate the header extension for the backing file + format. + """ + ext = FieldsList() + offset = struct.calcsize(header['header_length'][0].fmt) + \ + header['header_length'][0].offset + + if backing_file_fmt is not None: + # Calculation of the free space available in the first cluster + end_of_extension_area_len = 2 * UINT32_S + high_border = (header['backing_file_offset'][0].value or + ((1 << header['cluster_bits'][0].value) - 1)) - \ + end_of_extension_area_len + free_space = high_border - offset + ext_size = 2 * UINT32_S + ((len(backing_file_fmt) + 7) & ~7) + + if free_space >= ext_size: + ext_data_len = len(backing_file_fmt) + ext_data_fmt = '>' + str(ext_data_len) + 's' + ext_padding_len = 7 - (ext_data_len - 1) % 8 + ext = FieldsList([ + ['>I', offset, 0xE2792ACA, 'ext_magic'], + ['>I', offset + UINT32_S, ext_data_len, 'ext_length'], + [ext_data_fmt, offset + UINT32_S * 2, backing_file_fmt, + 'bf_format'] + ]) + offset = ext['bf_format'][0].offset + \ + struct.calcsize(ext['bf_format'][0].fmt) + \ + ext_padding_len + return (ext, offset) + + @staticmethod + def _feature_name_table(header, offset): + """Generate a random header extension for names of features used in + the image. + """ + def gen_feat_ids(): + """Return random feature type and feature bit.""" + return (random.randint(0, 2), random.randint(0, 63)) + + end_of_extension_area_len = 2 * UINT32_S + high_border = (header['backing_file_offset'][0].value or + (1 << header['cluster_bits'][0].value) - 1) - \ + end_of_extension_area_len + free_space = high_border - offset + # Sum of sizes of 'magic' and 'length' header extension fields + ext_header_len = 2 * UINT32_S + fnt_entry_size = 6 * UINT64_S + num_fnt_entries = min(10, (free_space - ext_header_len) / + fnt_entry_size) + if not num_fnt_entries == 0: + feature_tables = [] + feature_ids = [] + inner_offset = offset + ext_header_len + feat_name = 'some cool feature' + while len(feature_tables) < num_fnt_entries * 3: + feat_type, feat_bit = gen_feat_ids() + # Remove duplicates + while (feat_type, feat_bit) in feature_ids: + feat_type, feat_bit = gen_feat_ids() + feature_ids.append((feat_type, feat_bit)) + feat_fmt = '>' + str(len(feat_name)) + 's' + feature_tables += [['B', inner_offset, + feat_type, 'feature_type'], + ['B', inner_offset + 1, feat_bit, + 'feature_bit_number'], + [feat_fmt, inner_offset + 2, + feat_name, 'feature_name'] + ] + inner_offset += fnt_entry_size + # No padding for the extension is necessary, because + # the extension length is multiple of 8 + ext = FieldsList([ + ['>I', offset, 0x6803f857, 'ext_magic'], + # One feature table contains 3 fields and takes 48 bytes + ['>I', offset + UINT32_S, len(feature_tables) / 3 * 48, + 'ext_length'] + ] + feature_tables) + offset = inner_offset + else: + ext = FieldsList() + + return (ext, offset) + + @staticmethod + def _end_of_extension_area(offset): + """Generate a mandatory header extension marking end of header + extensions. + """ + ext = FieldsList([ + ['>I', offset, 0, 'ext_magic'], + ['>I', offset + UINT32_S, 0, 'ext_length'] + ]) + return ext + + def __init__(self, backing_file_name=None, backing_file_fmt=None): + """Create a random valid qcow2 image with the correct inner structure + and allowable values. + """ + # Image size is saved as an attribute for the runner needs + cluster_bits, self.image_size = self._size_params() + # Saved as an attribute, because it's necessary for writing + self.cluster_size = 1 << cluster_bits + self.header = self._header(cluster_bits, self.image_size, + backing_file_name) + self.backing_file_name = self._backing_file_name(self.header, + backing_file_name) + self.backing_file_format, \ + offset = self._backing_file_format(self.header, + backing_file_fmt) + self.feature_name_table, \ + offset = self._feature_name_table(self.header, offset) + self.end_of_extension_area = self._end_of_extension_area(offset) + # Container for entire image + self.data = FieldsList() + # Percentage of fields will be fuzzed + self.bias = random.uniform(0.2, 0.5) + + def __iter__(self): + return iter([self.header, + self.backing_file_format, + self.feature_name_table, + self.end_of_extension_area, + self.backing_file_name]) + + def _join(self): + """Join all image structure elements as header, tables, etc in one + list of fields. + """ + if len(self.data) == 0: + for v in self: + self.data += v + + def fuzz(self, fields_to_fuzz=None): + """Fuzz an image by corrupting values of a random subset of its fields. + + Without parameters the method fuzzes an entire image. + If 'fields_to_fuzz' is specified then only fields in this list will be + fuzzed. 'fields_to_fuzz' can contain both individual fields and more + general image elements as a header or tables. + In the first case the field will be fuzzed always. + In the second a random subset of fields will be selected and fuzzed. + """ + def coin(): + """Return boolean value proportional to a portion of fields to be + fuzzed. + """ + return random.random() < self.bias + + if fields_to_fuzz is None: + self._join() + for field in self.data: + if coin(): + field.value = getattr(fuzz, field.name)(field.value) + else: + for item in fields_to_fuzz: + if len(item) == 1: + for field in getattr(self, item[0]): + if coin(): + field.value = getattr(fuzz, + field.name)(field.value) + else: + for field in getattr(self, item[0])[item[1]]: + try: + field.value = getattr(fuzz, field.name)( + field.value) + except AttributeError: + # Some fields can be skipped depending on + # references, e.g. FNT header extension is not + # generated for a feature mask header field + # equal to zero + pass + + def write(self, filename): + """Write an entire image to the file.""" + image_file = open(filename, 'w') + self._join() + for field in self.data: + image_file.seek(field.offset) + image_file.write(struct.pack(field.fmt, field.value)) + image_file.seek(0, 2) + size = image_file.tell() + rounded = (size + self.cluster_size - 1) & ~(self.cluster_size - 1) + if rounded > size: + image_file.seek(rounded - 1) + image_file.write("\0") + image_file.close() + + +def create_image(test_img_path, backing_file_name=None, backing_file_fmt=None, + fields_to_fuzz=None): + """Create a fuzzed image and write it to the specified file.""" + image = Image(backing_file_name, backing_file_fmt) + image.fuzz(fields_to_fuzz) + image.write(test_img_path) + return image.image_size -- 1.9.3