Added a reset option to partitions and queries. The new option allows for all existing data to be removed before running the file copies. Makes a clean start to the data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/2bc25a10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/2bc25a10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/2bc25a10 Branch: refs/heads/prestonc/benchmarks_staging Commit: 2bc25a10517d80c2d9277b2be8ab73e6557ad1c2 Parents: 9456d4a Author: Preston Carman <[email protected]> Authored: Thu Mar 6 20:04:52 2014 -0800 Committer: Preston Carman <[email protected]> Committed: Thu Mar 6 20:04:52 2014 -0800 ---------------------------------------------------------------------- .../scripts/weather_benchmark.py | 23 +++++++++----------- .../noaa-ghcn-daily/scripts/weather_cli.py | 4 ++-- .../scripts/weather_convert_to_xml.py | 2 +- .../scripts/weather_data_files.py | 14 ++++++++---- 4 files changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index 8a032c2..66f85d6 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -68,7 +68,9 @@ class WeatherBenchmark: offset = 0 group_size = len(data_paths) / len(link_base_paths) for link_index, link_path in enumerate(link_base_paths): - for data_index, data_path in enumerate(data_paths): + if os.path.isdir(link_path): + shutil.rmtree(link_path) + for data_index, data_path in enumerate(data_paths): if offset <= data_index and data_index < offset + group_size: self.add_collection_links_for(data_path, link_path, data_index) offset += group_size @@ -134,38 +136,34 @@ class WeatherBenchmark: if index >= 0: os.symlink(real_path + collection + "/", collection_path + "index" + str(index)) - def copy_query_files(self): + def copy_query_files(self, reset): for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: - self.copy_local_query_files(test) + self.copy_local_query_files(test, reset) elif test in self.BENCHMARK_CLUSTER_TESTS: - self.copy_cluster_query_files(test) + self.copy_cluster_query_files(test, reset) else: print "Unknown test." exit() - def copy_cluster_query_files(self, test): + def copy_cluster_query_files(self, test, reset): '''Determine the data_link path for cluster query files and copy with new location for collection.''' partitions = self.dataset.get_partitions()[0] for i in range(len(self.nodes)): query_path = get_cluster_query_path(self.base_paths, test, i) - - if not os.path.isdir(query_path): - os.makedirs(query_path) + prepare_path(query_path, reset) # Copy query files. partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test) self.copy_and_replace_query(query_path, partition_paths) - def copy_local_query_files(self, test): + def copy_local_query_files(self, test, reset): '''Determine the data_link path for local query files and copy with new location for collection.''' for i in self.partitions: query_path = get_local_query_path(self.base_paths, test, i) - - if not os.path.isdir(query_path): - os.makedirs(query_path) + prepare_path(query_path, reset) # Copy query files. partition_paths = get_partition_paths(i, self.base_paths, "data_links/" + test) @@ -200,7 +198,6 @@ class WeatherBenchmark: print "Unknown test." exit() - def get_cluster_link_paths(nodes, base_paths, key="partitions"): link_paths = [] for i in range(0, nodes): http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py index 8d18607..a1a1aa2 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py @@ -211,7 +211,7 @@ def main(argv): slices = benchmark.get_number_of_slices() print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').' data.reset() - data.copy_to_n_partitions(xml_data_save_path, slices, base_paths) + data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) if section in ("all", "test_links"): # TODO determine current node @@ -220,7 +220,7 @@ def main(argv): if section in ("all", "queries"): print 'Processing the queries section (' + dataset.get_name() + ').' - benchmark.copy_query_files() + benchmark.copy_query_files(reset) # if section in ("statistics"): # print 'Processing the statistics section.' http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py index 1aee4a7..a7adce5 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py @@ -376,7 +376,7 @@ class WeatherConvertToXML: xml_station += self.default_xml_element("elevation", elevation, 2) state_code = self.get_field_from_definition(station_ghcnd_row, STATIONS_FIELDS['STATE']).strip() - if state_code != "": + if state_code != "" and state_code in self.STATES: xml_station += self.default_xml_location_labels("ST", "FIPS:" + str(self.STATES.keys().index(state_code)), self.STATES[state_code]) # Add the MSHR data to the station generated information. http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index da2afcc..a9b4ecc 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -92,7 +92,7 @@ class WeatherDataFiles: self.close_progress_data(True) self.reset() - def copy_to_n_partitions(self, save_path, partitions, base_paths=[]): + def copy_to_n_partitions(self, save_path, partitions, base_paths, reset): """Once the initial data has been generated, the data can be copied into a set number of partitions. """ if (len(base_paths) == 0): return @@ -103,8 +103,7 @@ class WeatherDataFiles: for path in partition_paths: partition_sizes.append(0) # Make sure the xml folder is available. - if not os.path.isdir(path): - os.makedirs(path) + prepare_path(path, reset) # copy stations and sensors into each partition current_partition = 0 @@ -125,7 +124,6 @@ class WeatherDataFiles: if os.path.isdir(file_path): distutils.dir_util.copy_tree(file_path, new_file_path) partition_sizes[current_partition] += size - # Copy station files type = "stations" @@ -319,6 +317,7 @@ class WeatherDataFiles: break return columns[self.INDEX_DATA_FILE_NAME] + def get_partition_paths(partitions, base_paths, key="partitions"): partition_paths = [] for i in range(0, partitions): @@ -330,4 +329,11 @@ def get_partition_paths(partitions, base_paths, key="partitions"): def get_partition_folder(disks, partitions, index): return "d" + str(disks) + "_p" + str(partitions) + "_i" + str(index) +def prepare_path(path, reset): + """Ensures the directory is available. If reset, then its a brand new directory.""" + if os.path.isdir(path) and reset: + shutil.rmtree(path) + + if not os.path.isdir(path): + os.makedirs(path)
