Updated the method to partition the weather data. The new method is per xml file using a round robin method.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/ec0e5573 Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/ec0e5573 Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/ec0e5573 Branch: refs/heads/prestonc/hash_join Commit: ec0e5573d14a0efe6afa5d7415f3298aa54608ea Parents: bb1ec57 Author: Preston Carman <[email protected]> Authored: Tue Mar 25 16:39:22 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Tue Mar 25 16:39:22 2014 -0700 ---------------------------------------------------------------------- .../scripts/weather_benchmark.py | 2 +- .../scripts/weather_data_files.py | 54 ++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ec0e5573/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index 2077c10..be95ef8 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -140,7 +140,7 @@ class WeatherBenchmark: print "Unknown test." return group_size = group_size / link_virtual - node_offset = group_size * (node_index * local_virtual_partitions) + node_offset = group_size * (node_index * partition) node_offset += group_size * link_index has_data = True if link_node < node_index: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ec0e5573/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index 64e19d6..a7fb691 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -105,6 +105,60 @@ class WeatherDataFiles: # Make sure the xml folder is available. prepare_path(path, reset) + import fnmatch + import os + + # copy stations and sensors into each partition + current_sensor_partition = 0 + current_station_partition = 0 + self.open_progress_data() + row_count = len(self.progress_data) + for row in range(0, row_count): + row_contents = self.progress_data[row].rsplit(self.SEPERATOR) + file_name = row_contents[self.INDEX_DATA_FILE_NAME] + station_id = os.path.basename(file_name).split('.')[0] + + # Copy sensor files + type = "sensors" + file_path = build_base_save_folder(save_path, station_id, type) + station_id + for root, dirnames, filenames in os.walk(file_path): + for filename in fnmatch.filter(filenames, '*.xml'): + xml_path = os.path.join(root, filename) + new_file_base = build_base_save_folder(partition_paths[current_sensor_partition], station_id, type) + station_id + if not os.path.isdir(new_file_base): + os.makedirs(new_file_base) + shutil.copyfile(xml_path, new_file_base + "/" + filename) + current_sensor_partition += 1 + if current_sensor_partition >= len(partition_paths): + current_sensor_partition = 0 + + # Copy station files + type = "stations" + file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml" + new_file_base = build_base_save_folder(partition_paths[current_station_partition], station_id, type) + new_file_path = new_file_base + station_id + ".xml" + if os.path.isfile(file_path): + if not os.path.isdir(new_file_base): + os.makedirs(new_file_base) + shutil.copyfile(file_path, new_file_path) + current_station_partition += 1 + if current_station_partition >= len(partition_paths): + current_station_partition = 0 + + + def copy_to_n_partitions_by_station(self, save_path, partitions, base_paths, reset): + """Once the initial data has been generated, the data can be copied into a set number of partitions. """ + if (len(base_paths) == 0): + return + + # Initialize the partition paths. + partition_sizes = [] + partition_paths = get_partition_paths(0, partitions, base_paths) + for path in partition_paths: + partition_sizes.append(0) + # Make sure the xml folder is available. + prepare_path(path, reset) + # copy stations and sensors into each partition current_partition = 0 csv_sorted = self.get_csv_in_partition_order()
