Benchmark scripts simplification and update. * Removed compression. * Added a print out of the partition scheme. * Added new queries to the list.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/e9805a66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/e9805a66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/e9805a66 Branch: refs/heads/prestonc/hash_join Commit: e9805a66a44b980ae20c560bc8972c282bd80919 Parents: 2a9629f Author: Preston Carman <[email protected]> Authored: Thu Mar 13 23:15:10 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Thu Mar 13 23:15:10 2014 -0700 ---------------------------------------------------------------------- .../scripts/weather_benchmark.py | 105 ++++++++++++++----- .../noaa-ghcn-daily/scripts/weather_cli.py | 21 ++-- .../scripts/weather_convert_to_xml.py | 11 +- .../scripts/weather_data_files.py | 17 ++- 4 files changed, 108 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/e9805a66/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index 66f85d6..0500de3 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -34,7 +34,8 @@ class WeatherBenchmark: QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/" QUERY_MASTER_FOLDER = "../queries/" - QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq"] + QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq", "q06.xq"] + QUERY_UTILITY_LIST = ["sensor_count.xq", "station_count.xq", "q04_sensor.xq", "q04_station.xq", "q05_sensor.xq", "q05_station.xq", "q06_sensor.xq"] BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] QUERY_COLLECTIONS = ["sensors", "stations"] @@ -47,40 +48,73 @@ class WeatherBenchmark: self.dataset = dataset self.nodes = nodes - def build_data_links(self, xml_save_path): + def print_partition_scheme(self, xml_save_path): if (len(self.base_paths) == 0): return for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: - self.build_local_data_links(test, xml_save_path) + self.print_local_partition_schemes(test, xml_save_path) elif test in self.BENCHMARK_CLUSTER_TESTS: - self.build_cluster_data_links(test, xml_save_path) + self.print_cluster_partition_schemes(test, xml_save_path) else: print "Unknown test." exit() - def build_local_data_links(self, test, xml_save_path): - virtual_partitions = get_local_virtual_partitions(self.partitions) - data_paths = get_partition_paths(virtual_partitions, self.base_paths) + def print_local_partition_schemes(self, test, xml_save_path): for i in self.partitions: - link_base_paths = get_partition_paths(i, self.base_paths, "data_links/" + test) - # Match link paths to real data paths. - offset = 0 - group_size = len(data_paths) / len(link_base_paths) - for link_index, link_path in enumerate(link_base_paths): - if os.path.isdir(link_path): - shutil.rmtree(link_path) - for data_index, data_path in enumerate(data_paths): - if offset <= data_index and data_index < offset + group_size: - self.add_collection_links_for(data_path, link_path, data_index) - offset += group_size + scheme = self.get_local_partition_scheme(test, xml_save_path, i) + virtual_partitions = get_local_virtual_partitions(self.partitions) + self.print_partition_schemes(virtual_partitions, scheme, test, i) + + def print_cluster_partition_schemes(self, test, xml_save_path): + scheme = self.get_cluster_partition_scheme(test, xml_save_path) + virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) + self.print_partition_schemes(virtual_partitions, scheme, test, 0) + + def print_partition_schemes(self, virtual_partitions, scheme, test, partitions): + print + print "---------------- Partition Scheme --------------------" + print " Test: " + test + print " Virtual Partitions: " + str(virtual_partitions) + print " Disks: " + str(len(self.base_paths)) + print " Partitions: " + str(partitions) + + if len(scheme) > 0: + folder_length = len(scheme[0][3]) + 5 + row_format = "{:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}" + HEADER = ("Index", "Link", "Data Path", "Link Path") + print row_format.format(*HEADER) + for row in scheme: + print row_format.format(*row) + print + else: + print " Scheme is EMPTY." + + def get_local_partition_scheme(self, test, xml_save_path, partition): + scheme = [] + virtual_partitions = get_local_virtual_partitions(self.partitions) + data_schems = get_partition_scheme(virtual_partitions, self.base_paths) + + link_base_schemes = get_partition_scheme(partition, self.base_paths, "data_links/" + test) + # Match link paths to real data paths. + offset = 0 + group_size = len(data_schemes) / len(link_base_schemes) + for link_disk, link_virtual, link_index, link_path in enumerate(link_base_schemes): + for data_disk, data_virtual, data_index, data_path in enumerate(data_schemes): + if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size: + scheme.append([data_index, link_index, data_path, link_path]) + elif test == "local_batch_scale_out" and data_index == link_index: + scheme.append([data_index, link_index, data_path, link_path]) + offset += group_size + return scheme - def build_cluster_data_links(self, test, xml_save_path): + def get_cluster_partition_scheme(self, test, xml_save_path): node_index = self.get_current_node_index() if node_index == -1: print "Unknown host." return + scheme = [] virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) data_paths = get_partition_paths(virtual_partitions, self.base_paths) link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test) @@ -88,8 +122,6 @@ class WeatherBenchmark: # Match link paths to real data paths. link_base_paths.sort() for link_index, link_path in enumerate(link_base_paths): - if os.path.isdir(link_path): - shutil.rmtree(link_path) # Prep link_offset = link_index % len(self.nodes) disk_offset = link_index // len(self.nodes) @@ -111,8 +143,33 @@ class WeatherBenchmark: data_paths.sort() for data_index, data_path in enumerate(data_paths): if has_data and node_offset <= data_index and data_index < node_offset + group_size: - self.add_collection_links_for(data_path, link_path, data_index) - self.add_collection_links_for("", link_path, -1) + scheme.append([data_index, link_index, data_path, link_path]) + scheme.append([-1, link_index, "", link_path]) + return scheme + + def build_data_links(self, xml_save_path): + if (len(self.base_paths) == 0): + return + for test in self.dataset.get_tests(): + if test in self.BENCHMARK_LOCAL_TESTS: + for i in self.partitions: + scheme = self.get_local_partition_scheme(test, xml_save_path, i) + self.build_data_links_scheme(scheme) + elif test in self.BENCHMARK_CLUSTER_TESTS: + scheme = self.get_cluster_partition_scheme(test, xml_save_path) + self.build_data_links_scheme(scheme) + else: + print "Unknown test." + exit() + + def build_data_links_scheme(self, scheme): + """Build all the data links based on the scheme information.""" + link_path_cleared = [] + for (data_index, partition, data_path, link_path) in scheme: + if link_path not in link_path_cleared and os.path.isdir(link_path): + shutil.rmtree(link_path) + link_path_cleared.append(link_path) + self.add_collection_links_for(data_path, link_path, data_index) def get_current_node_index(self): found = False @@ -172,7 +229,7 @@ class WeatherBenchmark: def copy_and_replace_query(self, query_path, replacement_list): '''Copy the query files over to the query_path and replace the path for the where the collection data is located.''' - for query_file in self.QUERY_FILE_LIST: + for query_file in self.QUERY_FILE_LIST + self.QUERY_UTILITY_LIST: shutil.copyfile(self.QUERY_MASTER_FOLDER + query_file, query_path + query_file) # Make a search replace for each collection. http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/e9805a66/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py index a1a1aa2..52945e5 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py @@ -24,7 +24,6 @@ from weather_config import * from weather_benchmark import * DEBUG_OUTPUT = False -COMPRESSED = False # # Weather conversion for GHCN-DAILY files to xml. @@ -42,7 +41,7 @@ def main(argv): xml_config_path = "" try: - opts, args = getopt.getopt(argv, "acf:hl:m:ruvw:x:", ["file=", "locality=", "max_station_files=", "web_service=", "xml_config="]) + opts, args = getopt.getopt(argv, "af:hl:m:ruvw:x:", ["file=", "locality=", "max_station_files=", "web_service=", "xml_config="]) except getopt.GetoptError: print 'The file options for weather_cli.py were not correctly specified.' print 'To see a full list of options try:' @@ -52,10 +51,9 @@ def main(argv): if opt == '-h': print 'Converting weather daily files to xml options:' print ' -a Append the results to the progress file.' - print ' -c Compress the produced XML file with .gz.' print ' -f (str) The file name of a specific station to process.' print ' * Helpful when testing a single stations XML file output.' - print ' -l (str) Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, statistics).' + print ' -l (str) Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, partition_scheme, statistics).' print ' -m (int) Limits the number of files created for each station.' print ' * Helpful when testing to make sure all elements are supported for each station.' print ' Alternate form: --max_station_files=(int)' @@ -67,9 +65,6 @@ def main(argv): sys.exit() elif opt in ('-a', "--append"): append = True - elif opt == '-c': - global COMPRESSED - COMPRESSED = True elif opt in ('-f', "--file"): # check if file exists. if os.path.exists(arg): @@ -78,7 +73,7 @@ def main(argv): print 'Error: Argument must be a file name for --file (-f).' sys.exit() elif opt in ('-l', "--locality"): - if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "test_links", "queries", "statistics"): + if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "partition_scheme", "test_links", "queries", "statistics"): section = arg else: print 'Error: Argument must be a string for --locality (-l) and a valid locality.' @@ -144,7 +139,7 @@ def main(argv): os.makedirs(xml_data_save_path) # Set up the XML build objects. - convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, COMPRESSED, DEBUG_OUTPUT) + convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, DEBUG_OUTPUT) progress_file = xml_data_save_path + "_data_progress.csv" data = WeatherDataFiles(ghcnd_data_dly_path, progress_file) if section in ("all", "progress_file"): @@ -207,15 +202,19 @@ def main(argv): base_paths.append(paths + dataset_folder + "/") benchmark = WeatherBenchmark(base_paths, dataset.get_partitions(), dataset, config.get_node_machine_list()) - if section in ("all", "partition"): + if section in ("all", "partition", "partition_scheme"): slices = benchmark.get_number_of_slices() print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').' data.reset() - data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) + if section == "partition_scheme": + benchmark.print_partition_scheme(xml_data_save_path) + else: + data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) if section in ("all", "test_links"): # TODO determine current node print 'Processing the test links section (' + dataset.get_name() + ').' + benchmark.print_partition_scheme(xml_data_save_path) benchmark.build_data_links(xml_data_save_path) if section in ("all", "queries"): http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/e9805a66/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py index a7adce5..c115efa 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py @@ -17,7 +17,6 @@ import textwrap from datetime import date import os -import gzip from collections import OrderedDict # Custom modules. @@ -104,9 +103,8 @@ class WeatherConvertToXML: token = "" - def __init__(self, base_path, save_path, compressed, debug_output): + def __init__(self, base_path, save_path, debug_output): self.save_path = save_path - self.compressed = compressed self.debug_output = debug_output # Extra support files. @@ -134,14 +132,9 @@ class WeatherConvertToXML: print str(field[FIELD_INDEX_NAME]) + " = '" + row[(field[FIELD_INDEX_START] - 1):field[FIELD_INDEX_END]] + "'" def save_file(self, filename, contents): - if self.compressed: - filename = filename + '.gz' - file = gzip.open(filename, 'wb') - else: - file = open(filename, 'w') + file = open(filename, 'w') file.write(contents) file.close() - return filename def get_folder_size(self, folder_name): http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/e9805a66/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index a9b4ecc..a68cba1 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -318,13 +318,26 @@ class WeatherDataFiles: return columns[self.INDEX_DATA_FILE_NAME] +# Index values of each field details. +PARTITION_INDEX_DISK = 0 +PARTITION_INDEX_VIRTUAL = 1 +PARTITION_INDEX = 2 +PARTITION_INDEX_PATH = 3 +PARTITION_HEADER = ("Disk", "Virtual", "Index", "Path") + def get_partition_paths(partitions, base_paths, key="partitions"): partition_paths = [] + for scheme in get_partition_scheme(partitions, base_paths, key): + partition_paths.append(scheme[PARTITION_INDEX_PATH]) + return partition_paths + +def get_partition_scheme(partitions, base_paths, key="partitions"): + partition_scheme = [] for i in range(0, partitions): for j in range(0, len(base_paths)): new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/" - partition_paths.append(new_partition_path) - return partition_paths + partition_scheme.append((j, partitions, i, new_partition_path)) + return partition_scheme def get_partition_folder(disks, partitions, index): return "d" + str(disks) + "_p" + str(partitions) + "_i" + str(index)
