Benchmark clean up and adding an option for creating partitions of large files instead of the original small files.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/17a08527 Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/17a08527 Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/17a08527 Branch: refs/heads/prestonc/benchmark Commit: 17a08527c4b689c67118445f7e326cbd50f36a32 Parents: 22f8441 Author: Preston Carman <[email protected]> Authored: Thu May 8 14:08:04 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Thu May 8 14:15:35 2014 -0700 ---------------------------------------------------------------------- .../scripts/weather_benchmark.py | 53 ++--- .../noaa-ghcn-daily/scripts/weather_cli.py | 7 +- .../noaa-ghcn-daily/scripts/weather_config.py | 12 +- .../scripts/weather_convert_to_xml.py | 200 +------------------ .../scripts/weather_data_files.py | 107 +++++----- 5 files changed, 102 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index daae6b2..b862b31 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -32,6 +32,8 @@ from weather_data_files import * # logs/ class WeatherBenchmark: + DATA_LINKS_FOLDER = "data_links/" + LARGE_FILE_ROOT_TAG = WeatherDataFiles.LARGE_FILE_ROOT_TAG QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/" QUERY_MASTER_FOLDER = "../queries/" QUERY_FILE_LIST = [ @@ -125,7 +127,7 @@ class WeatherBenchmark: scheme = [] virtual_partitions = get_local_virtual_partitions(self.partitions) data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths) - link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test) + link_base_schemes = get_partition_scheme(0, partition, self.base_paths, self.DATA_LINKS_FOLDER + test) # Match link paths to real data paths. group_size = len(data_schemes) / len(link_base_schemes) @@ -154,7 +156,7 @@ class WeatherBenchmark: local_virtual_partitions = get_local_virtual_partitions(self.partitions) virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) data_schemes = get_partition_scheme(node_index, virtual_partitions, self.base_paths) - link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, "data_links/" + test) + link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, self.DATA_LINKS_FOLDER + test) # Match link paths to real data paths. for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: @@ -181,59 +183,57 @@ class WeatherBenchmark: scheme.append([link_disk, -1, link_index, "", link_path]) return scheme - def build_data_links(self): + def build_data_links(self, reset): if (len(self.base_paths) == 0): return + if reset: + shutil.rmtree(self.base_paths[0] + self.DATA_LINKS_FOLDER) for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: for i in self.partitions: scheme = self.get_local_partition_scheme(test, i) - self.build_data_links_scheme(scheme, False) + self.build_data_links_scheme(scheme) if 1 in self.partitions and len(self.base_paths) > 1: scheme = self.build_data_links_local_zero_partition(test) - self.build_data_links_scheme(scheme, False) + self.build_data_links_scheme(scheme) elif test in self.BENCHMARK_CLUSTER_TESTS: for i in self.partitions: scheme = self.get_cluster_partition_scheme(test, i) self.build_data_links_scheme(scheme) if 1 in self.partitions and len(self.base_paths) > 1: scheme = self.build_data_links_cluster_zero_partition(test) - self.build_data_links_scheme(scheme, False) + self.build_data_links_scheme(scheme) else: print "Unknown test." exit() - def build_data_links_scheme(self, scheme, reset = True): - """Build all the data links based on the scheme information.""" - link_path_cleared = [] + def build_data_links_scheme(self, scheme): + '''Build all the data links based on the scheme information.''' for (data_disk, data_index, partition, data_path, link_path) in scheme: - if link_path not in link_path_cleared and os.path.isdir(link_path) and reset: - shutil.rmtree(link_path) - link_path_cleared.append(link_path) self.add_collection_links_for(data_path, link_path, data_index) def build_data_links_cluster_zero_partition(self, test): - """Build a scheme for all data in one symbolically linked folder. (0 partition)""" + '''Build a scheme for all data in one symbolically linked folder. (0 partition)''' scheme = [] - link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, self.base_paths, "data_links/" + test) + link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, self.base_paths, self.DATA_LINKS_FOLDER + test) for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: - new_link_path = self.get_zero_partition_path(link_node, "data_links/" + test + "/" + str(link_node) + "nodes") + new_link_path = self.get_zero_partition_path(link_node, self.DATA_LINKS_FOLDER + test + "/" + str(link_node) + "nodes") scheme.append([0, link_disk, 0, link_path, new_link_path]) return scheme def build_data_links_local_zero_partition(self, test): - """Build a scheme for all data in one symbolically linked folder. (0 partition)""" + '''Build a scheme for all data in one symbolically linked folder. (0 partition)''' scheme = [] index = 0 - link_base_schemes = get_partition_scheme(0, 1, self.base_paths, "data_links/" + test) + link_base_schemes = get_partition_scheme(0, 1, self.base_paths, self.DATA_LINKS_FOLDER + test) for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: - new_link_path = self.get_zero_partition_path(link_node, "data_links/" + test) + new_link_path = self.get_zero_partition_path(link_node, self.DATA_LINKS_FOLDER + test) scheme.append([0, index, 0, link_path, new_link_path]) index += 1 return scheme def get_zero_partition_path(self, node, key): - """Return a partition path for the zero partition.""" + '''Return a partition path for the zero partition.''' base_path = self.base_paths[0] new_link_path = get_partition_scheme(node, 1, [base_path], key)[0][PARTITION_INDEX_PATH] return new_link_path.replace("p1", "p0") @@ -282,7 +282,7 @@ class WeatherBenchmark: prepare_path(query_path, reset) # Copy query files. - new_link_path = self.get_zero_partition_path(n, "data_links/" + test + "/" + str(n) + "nodes") + new_link_path = self.get_zero_partition_path(n, self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes") self.copy_and_replace_query(query_path, [new_link_path]) for n in range(len(self.nodes)): for p in self.partitions: @@ -290,7 +290,7 @@ class WeatherBenchmark: prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(n, p, self.base_paths, "data_links/" + test + "/" + str(n) + "nodes") + partition_paths = get_partition_paths(n, p, self.base_paths, self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes") self.copy_and_replace_query(query_path, partition_paths) def copy_local_query_files(self, test, reset): @@ -301,14 +301,14 @@ class WeatherBenchmark: prepare_path(query_path, reset) # Copy query files. - new_link_path = self.get_zero_partition_path(0, "data_links/" + test) + new_link_path = self.get_zero_partition_path(0, self.DATA_LINKS_FOLDER + test) self.copy_and_replace_query(query_path, [new_link_path]) for p in self.partitions: query_path = get_local_query_path(self.base_paths, test, p) prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(0, p, self.base_paths, "data_links/" + test) + partition_paths = get_partition_paths(0, p, self.base_paths, self.DATA_LINKS_FOLDER + test) self.copy_and_replace_query(query_path, partition_paths) def copy_and_replace_query(self, query_path, replacement_list): @@ -327,6 +327,13 @@ class WeatherBenchmark: for line in fileinput.input(query_path + query_file, True): sys.stdout.write(line.replace(self.QUERY_REPLACEMENT_KEY + collection, replace_string)) + # Make a search replace for partition type. + if self.dataset.get_partition_type() == "large_files": + for line in fileinput.input(query_path + query_file, True): + sys.stdout.write(line.replace("/stationCollection", "/" + self.LARGE_FILE_ROOT_TAG + "/stationCollection")) + for line in fileinput.input(query_path + query_file, True): + sys.stdout.write(line.replace("/dataCollection", "/" + self.LARGE_FILE_ROOT_TAG + "/dataCollection")) + def get_number_of_slices(self): if len(self.dataset.get_tests()) == 0: print "No test has been defined in config file." http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py index c18ac43..8ac6d17 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py @@ -209,13 +209,16 @@ def main(argv): if section == "partition_scheme": benchmark.print_partition_scheme() else: - data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) + if dataset.get_partition_type() == "large_files": + data.build_to_n_partition_files(xml_data_save_path, slices, base_paths, reset) + else: + data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) if section in ("all", "test_links"): # TODO determine current node print 'Processing the test links section (' + dataset.get_name() + ').' benchmark.print_partition_scheme() - benchmark.build_data_links() + benchmark.build_data_links(reset) if section in ("all", "queries"): print 'Processing the queries section (' + dataset.get_name() + ').' http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py index a6513c2..80607b8 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py @@ -41,9 +41,10 @@ class WeatherConfig: for node in self.config.getElementsByTagName("dataset"): name = self.get_dataset_name(node) save_paths = self.get_dataset_save_paths(node) + partition_type = self.get_dataset_partition_type(node) partitions = self.get_dataset_partitions(node) tests = self.get_dataset_tests(node) - nodes.append(Dataset(name, save_paths, partitions, tests)) + nodes.append(Dataset(name, save_paths, partition_type, partitions, tests)) return nodes @@ -69,6 +70,9 @@ class WeatherConfig: paths.append(self.get_text(item)) return paths + def get_dataset_partition_type(self, node): + return self.get_text(node.getElementsByTagName("partition_type")[0]) + def get_dataset_partitions(self, node): paths = [] for item in node.getElementsByTagName("partitions_per_path"): @@ -103,10 +107,11 @@ class Machine: return self.id + "(" + self.ip + ")" class Dataset: - def __init__(self, name, save_paths, partitions, tests): + def __init__(self, name, save_paths, partition_type, partitions, tests): self.name = name self.save_paths = save_paths self.partitions = partitions + self.partition_type = partition_type self.tests = tests def get_name(self): @@ -118,6 +123,9 @@ class Dataset: def get_partitions(self): return self.partitions + def get_partition_type(self): + return self.partition_type + def get_tests(self): return self.tests http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py index a4f33a1..5db090a 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py @@ -150,10 +150,6 @@ class WeatherConvertToXML: # Default return 0 - def process_one_day(self, records, report_date, page): - # Default - return 0 - def process_station_data(self, row): # Default return 0 @@ -165,7 +161,7 @@ class WeatherConvertToXML: print "Processing inventory file" file_stream = open(self.ghcnd_inventory, 'r') - csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 'TOTAL_YEARS_FOR_ALL_SENSORS'] + csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 'TOTAL_YEARS_FOR_ALL_SENSORS'] row = file_stream.readline() csv_inventory = {} for row in file_stream: @@ -243,16 +239,6 @@ class WeatherConvertToXML: def convert_c2f(self, c): return (9 / 5 * c) + 32 - def default_xml_start(self): - return textwrap.dedent("""\ - <?xml version="1.0" encoding="ISO-8859-1"?> - <ghcnd_observation version="1.0" - xmlns:xsd="http://www.w3.org/2001/XMLSchema" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <credit>NOAA's National Climatic Data Center (NCDC)</credit> - <credit_URL>http://www.ncdc.noaa.gov/</credit_URL> - """) - def default_xml_web_service_start(self): field_xml = "" field_xml += "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n" @@ -273,76 +259,6 @@ class WeatherConvertToXML: field_xml += self.get_indent_space(indent) + "<date>" + str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + str(report_date.day).zfill(2) + "T00:00:00.000</date>\n" return field_xml - def get_date_from_field(self, row, field): - report_date = self.get_field_from_definition(row, field) - return str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + str(report_date.day).zfill(2) - - def default_xml_field_date_iso8601(self, report_date): - field_xml = "" - field_xml += " <observation_date>" + self.MONTHS[report_date.month - 1] + " " + str(report_date.day) + ", " + str(report_date.year) + "</observation_date>\n" - field_xml += " <observation_date_iso8601>" + report_date.isoformat() + "</observation_date_iso8601>\n" - return field_xml - - def default_xml_field_date_year(self, year): - field_xml = "" - field_xml += " <observation_year>" + str(year) + "</observation_year>\n" - return field_xml - - def default_xml_field_date_month(self, month): - field_xml = "" - field_xml += " <observation_month>" + str(month) + "</observation_month>\n" - return field_xml - - def default_xml_field_date_day(self, day): - field_xml = "" - field_xml += " <observation_day>" + str(day) + "</observation_day>\n" - return field_xml - - def default_xml_field_station_id(self, station_id, indent=2): - field_xml = "" - field_xml += self.get_indent_space(indent) + "<station_id>" + station_id + "</station_id>\n" - return field_xml - - def default_xml_field_station(self, station_id): - station_row = "" - stations_file = open(self.ghcnd_stations, 'r') - - for line in stations_file: - if station_id == self.get_field_from_definition(line, STATIONS_FIELDS['ID']): - station_row = line - break - - field_xml = "" - field_xml += " <station_id>" + station_id + "</station_id>\n" - field_xml += " <location>\n" - field_xml += " <latitude>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['LATITUDE']).strip() + "</latitude>\n" - field_xml += " <longitude>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['LONGITUDE']).strip() + "</longitude>\n" - - elevation = self.get_field_from_definition(station_row, STATIONS_FIELDS['ELEVATION']).strip() - if elevation != "-999.9": - field_xml += " <elevation>" + elevation + "</elevation>\n" - - field_xml += " </location>\n" - field_xml += " <name>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['NAME']).strip() + "</name>\n" - - state = self.get_field_from_definition(station_row, STATIONS_FIELDS['STATE']) - if state.strip(): - field_xml += " <state>" + state + "</state>\n" - - gsn = self.get_field_from_definition(station_row, STATIONS_FIELDS['GSNFLAG']) - if gsn.strip(): - field_xml += " <gsn />\n" - - hcn = self.get_field_from_definition(station_row, STATIONS_FIELDS['HCNFLAG']) - if hcn.strip(): - field_xml += " <hcn />\n" - - wmoid = self.get_field_from_definition(station_row, STATIONS_FIELDS['WMOID']) - if wmoid.strip(): - field_xml += " <wmoid id=\"" + wmoid + "\" />\n" - - return field_xml - def default_xml_mshr_station_additional(self, station_id): """The web service station data is generate from the MSHR data supplemented with GHCN-Daily.""" station_mshr_row = "" @@ -492,120 +408,6 @@ class WeatherConvertToXML: def get_indent_space(self, indent): return (" " * (4 * indent)) -class WeatherDailyXMLFile(WeatherConvertToXML): - def process_one_month_sensor_set(self, records, page): - year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR)) - month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH)) - - station_id = self.get_dly_field(records[0], DLY_FIELD_ID) - - count = 0 - for day in range(1, 32): - try: - # TODO find out what is a valid python date range? 1889? - # Attempt to see if this is valid date. - report_date = date(year, month, day) - save_file_name = self.process_one_day(records, report_date, page) - if save_file_name is not "": - count = count + 1 - if self.debug_output: - print "Wrote file: " + save_file_name - except ValueError: - if self.debug_output: - print "Error: Not a valid date (" + str(month) + "/" + str(day) + "/" + str(year) + ") for " + station_id + "." - pass - return count - - def process_one_day(self, records, report_date, page): - station_id = self.get_dly_field(records[0], DLY_FIELD_ID) - found_data = False - - # Information for each daily file. - daily_xml_file = self.default_xml_start() - daily_xml_file += self.default_xml_field_station(station_id) - daily_xml_file += self.default_xml_field_date_iso8601(report_date) - daily_xml_file += self.default_xml_start_tag("sensors") - for record in records: - record_xml_snip = self.default_xml_day_reading_as_field(record, report_date.day) - if record_xml_snip is not "": - found_data = True - daily_xml_file += record_xml_snip - daily_xml_file += self.default_xml_end_tag("sensors") - daily_xml_file += self.default_xml_end() - - if not found_data: - return "" - - # Make sure the station folder is available. - ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + station_id + "/" + str(report_date.year) + "/" - if not os.path.isdir(ghcnd_xml_station_path): - os.makedirs(ghcnd_xml_station_path) - - # Save XML string to disk. - save_file_name = ghcnd_xml_station_path + build_sensor_save_filename(station_id, report_date, page) - save_file_name = self.save_file(save_file_name, daily_xml_file) - - return save_file_name - -class WeatherMonthlyXMLFile(WeatherConvertToXML): - def process_one_month_sensor_set(self, records, page): - found_data = False - year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR)) - month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH)) - - station_id = self.get_dly_field(records[0], DLY_FIELD_ID) - - # Information for each daily file. - daily_xml_file = self.default_xml_start() - daily_xml_file += self.default_xml_field_station(station_id) - daily_xml_file += self.default_xml_field_date_year(year) - daily_xml_file += self.default_xml_field_date_month(month) - daily_xml_file += self.default_xml_start_tag("readings") - - for day in range(1, 32): - try: - # TODO find out what is a valid python date range? 1889? - # Attempt to see if this is valid date. - report_date = date(year, month, day) - found_daily_data = False - record_xml_snip = "" - - for record in records: - record_xml_snip += self.default_xml_day_reading_as_field(record, report_date.day) - if record_xml_snip is not "": - found_data = True - found_daily_data = True - - if found_daily_data: - daily_xml_file += self.default_xml_start_tag("reading", 2) - daily_xml_file += self.default_xml_field_date_day(day) - daily_xml_file += record_xml_snip - daily_xml_file += self.default_xml_end_tag("reading", 2) - - except ValueError: - pass - - daily_xml_file += self.default_xml_end_tag("readings") - daily_xml_file += self.default_xml_end() - - if not found_data: - return 0 - - # Make sure the station folder is available. - ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + station_id + "/" + str(report_date.year) + "/" - if not os.path.isdir(ghcnd_xml_station_path): - os.makedirs(ghcnd_xml_station_path) - - # Save XML string to disk. - save_file_name = ghcnd_xml_station_path + build_sensor_save_filename(station_id, report_date, page) - save_file_name = self.save_file(save_file_name, daily_xml_file) - - if save_file_name is not "": - if self.debug_output: - print "Wrote file: " + save_file_name - return 1 - else: - return 0 class WeatherWebServiceMonthlyXMLFile(WeatherConvertToXML): """The web service class details how to create files similar to the NOAA web service.""" http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index a7fb691..1c9f129 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -26,6 +26,8 @@ from collections import OrderedDict # Allows partition and picking up where you left off. class WeatherDataFiles: + LARGE_FILE_ROOT_TAG = "root" + INDEX_DATA_FILE_NAME = 0 INDEX_DATA_SENSORS_STATUS = 1 INDEX_DATA_STATION_STATUS = 2 @@ -51,14 +53,10 @@ class WeatherDataFiles: self.current = self.DATA_FILE_START_INDEX self.progress_data = [] - - def get_file_list(self): - return glob.glob(self.base_path + "/*" + self.DATA_FILE_EXTENSION) - def get_file_list_iterator(self): + """Return the list of files one at a time.""" return glob.iglob(self.base_path + "/*" + self.DATA_FILE_EXTENSION) - # Save Functions def build_progress_file(self, options, convert): if not os.path.isfile(self.progress_file_name) or 'reset' in options: @@ -98,10 +96,8 @@ class WeatherDataFiles: return # Initialize the partition paths. - partition_sizes = [] partition_paths = get_partition_paths(0, partitions, base_paths) for path in partition_paths: - partition_sizes.append(0) # Make sure the xml folder is available. prepare_path(path, reset) @@ -145,66 +141,72 @@ class WeatherDataFiles: if current_station_partition >= len(partition_paths): current_station_partition = 0 - - def copy_to_n_partitions_by_station(self, save_path, partitions, base_paths, reset): - """Once the initial data has been generated, the data can be copied into a set number of partitions. """ + def build_to_n_partition_files(self, save_path, partitions, base_paths, reset): + """Once the initial data has been generated, the data can be divided into partitions + and stored in single files. + """ if (len(base_paths) == 0): return + XML_START = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>" + # Initialize the partition paths. - partition_sizes = [] partition_paths = get_partition_paths(0, partitions, base_paths) + sensors_partition_files = [] + stations_partition_files = [] for path in partition_paths: - partition_sizes.append(0) # Make sure the xml folder is available. prepare_path(path, reset) + prepare_path(path + "sensors/", False) + prepare_path(path + "stations/", False) + sensors_partition_files.append(open(path + "sensors/partition.xml", 'w')) + stations_partition_files.append(open(path + "stations/partition.xml", 'w')) + + for row in range(0, len(partition_paths)): + sensors_partition_files[row].write(XML_START + "<" + self.LARGE_FILE_ROOT_TAG + ">\n") + stations_partition_files[row].write(XML_START + "<" + self.LARGE_FILE_ROOT_TAG + ">\n") - # copy stations and sensors into each partition - current_partition = 0 - csv_sorted = self.get_csv_in_partition_order() - for item, size in csv_sorted.iteritems(): - if size < 0: - print "The progress file does not have the sensor size data saved." - return - - station_id = item.split('.')[0] - # Update partition bases on smallest current size. - current_partition = partition_sizes.index(min(partition_sizes)) - - # Copy sensor files - type = "sensors" - file_path = build_base_save_folder(save_path, station_id, type) + station_id - new_file_path = build_base_save_folder(partition_paths[current_partition], station_id, type) + station_id - if os.path.isdir(file_path): - distutils.dir_util.copy_tree(file_path, new_file_path) - partition_sizes[current_partition] += size + import fnmatch + import os - # Copy station files - type = "stations" - file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml" - new_file_base = build_base_save_folder(partition_paths[current_partition], station_id, type) - new_file_path = new_file_base + station_id + ".xml" - if os.path.isfile(file_path): - if not os.path.isdir(new_file_base): - os.makedirs(new_file_base) - shutil.copyfile(file_path, new_file_path) - - def get_csv_in_partition_order(self): + # copy stations and sensors into each partition + current_sensor_partition = 0 + current_station_partition = 0 self.open_progress_data() row_count = len(self.progress_data) - - # Get the dictionary of all the files and data sizes. - csv_dict = dict() for row in range(0, row_count): row_contents = self.progress_data[row].rsplit(self.SEPERATOR) file_name = row_contents[self.INDEX_DATA_FILE_NAME] - folder_data = int(row_contents[self.INDEX_DATA_FOLDER_DATA]) + station_id = os.path.basename(file_name).split('.')[0] + + # Copy sensor files + type = "sensors" + file_path = build_base_save_folder(save_path, station_id, type) + station_id + for root, dirnames, filenames in os.walk(file_path): + for filename in fnmatch.filter(filenames, '*.xml'): + xml_path = os.path.join(root, filename) + xml_data = file_get_contents(xml_path).replace(XML_START, "") + "\n" + sensors_partition_files[current_sensor_partition].write(xml_data) + current_sensor_partition += 1 + if current_sensor_partition >= len(sensors_partition_files): + current_sensor_partition = 0 - csv_dict[file_name] = folder_data - - # New sorted list. - return OrderedDict(sorted(csv_dict.items(), key=lambda x: x[1], reverse=True)) - + # Copy station files + type = "stations" + file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml" + xml_path = os.path.join(root, file_path) + xml_data = file_get_contents(xml_path).replace(XML_START, "") + "\n" + stations_partition_files[current_station_partition].write(xml_data) + current_station_partition += 1 + if current_station_partition >= len(partition_paths): + current_station_partition = 0 + + for row in range(0, len(partition_paths)): + sensors_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG + ">\n") + sensors_partition_files[row].close() + stations_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG + ">\n") + stations_partition_files[row].close() + def get_file_row(self, file_name): for i in range(0, len(self.progress_data)): if self.progress_data[i].startswith(file_name): @@ -405,3 +407,6 @@ def prepare_path(path, reset): if not os.path.isdir(path): os.makedirs(path) +def file_get_contents(filename): + with open(filename) as f: + return f.read()
