Changing benchmark scripts to support local partitions on a cluster.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/ea297e72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/ea297e72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/ea297e72 Branch: refs/heads/prestonc/hash_join Commit: ea297e72bd00d15649fa47991833d114c93c3821 Parents: 0b1033f Author: Preston Carman <[email protected]> Authored: Mon Mar 17 18:08:54 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Tue Apr 1 20:56:25 2014 -0700 ---------------------------------------------------------------------- .../scripts/weather_benchmark.py | 104 +++++++++---------- .../scripts/weather_data_files.py | 21 ++-- 2 files changed, 60 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ea297e72/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index 6374d38..fdd08ec 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -61,28 +61,32 @@ class WeatherBenchmark: exit() def print_local_partition_schemes(self, test, xml_save_path): + node_index = 0 + virtual_partitions = get_local_virtual_partitions(self.partitions) for i in self.partitions: scheme = self.get_local_partition_scheme(test, xml_save_path, i) - virtual_partitions = get_local_virtual_partitions(self.partitions) - self.print_partition_schemes(virtual_partitions, scheme, test, i) + self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index) def print_cluster_partition_schemes(self, test, xml_save_path): - scheme = self.get_cluster_partition_scheme(test, xml_save_path) + node_index = self.get_current_node_index() virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) - self.print_partition_schemes(virtual_partitions, scheme, test, 0) + for i in self.partitions: + scheme = self.get_cluster_partition_scheme(test, xml_save_path, i) + self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index) - def print_partition_schemes(self, virtual_partitions, scheme, test, partitions): + def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id): print print "---------------- Partition Scheme --------------------" print " Test: " + test print " Virtual Partitions: " + str(virtual_partitions) print " Disks: " + str(len(self.base_paths)) print " Partitions: " + str(partitions) + print " Node Id: " + str(node_id) if len(scheme) > 0: folder_length = len(scheme[0][3]) + 5 - row_format = "{:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}" - HEADER = ("Index", "Link", "Data Path", "Link Path") + row_format = "{:>5} {:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}" + HEADER = ("Disk", "Index", "Link", "Data Path", "Link Path") print row_format.format(*HEADER) for row in scheme: print row_format.format(*row) @@ -93,14 +97,14 @@ class WeatherBenchmark: def get_local_partition_scheme(self, test, xml_save_path, partition): scheme = [] virtual_partitions = get_local_virtual_partitions(self.partitions) - data_schems = get_partition_scheme(virtual_partitions, self.base_paths) + data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths) - link_base_schemes = get_partition_scheme(partition, self.base_paths, "data_links/" + test) + link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test) # Match link paths to real data paths. offset = 0 group_size = len(data_schemes) / len(link_base_schemes) - for link_disk, link_virtual, link_index, link_path in enumerate(link_base_schemes): - for data_disk, data_virtual, data_index, data_path in enumerate(data_schemes): + for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: + for data_node, data_disk, data_virtual, data_index, data_path in data_schemes: if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size: scheme.append([data_index, link_index, data_path, link_path]) elif test == "local_batch_scale_out" and data_index == link_index: @@ -108,43 +112,39 @@ class WeatherBenchmark: offset += group_size return scheme - def get_cluster_partition_scheme(self, test, xml_save_path): + def get_cluster_partition_scheme(self, test, xml_save_path, partition): node_index = self.get_current_node_index() if node_index == -1: print "Unknown host." return scheme = [] + local_virtual_partitions = get_local_virtual_partitions(self.partitions) virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) - data_paths = get_partition_paths(virtual_partitions, self.base_paths) - link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test) + data_schemes = get_partition_scheme(node_index, virtual_partitions, self.base_paths) + link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, "data_links/" + test) # Match link paths to real data paths. - link_base_paths.sort() - for link_index, link_path in enumerate(link_base_paths): + for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: # Prep - link_offset = link_index % len(self.nodes) - disk_offset = link_index // len(self.nodes) if test == "speed_up": - group_size = len(data_paths) / (link_offset + 1) / (len(self.base_paths)) + group_size = virtual_partitions / (link_node + 1) elif test == "batch_scale_out": - group_size = len(data_paths) / len(self.nodes) / (len(self.base_paths)) + group_size = virtual_partitions / len(self.nodes) else: print "Unknown test." return - node_offset = group_size * node_index - for j in range(disk_offset): - node_offset += len(data_paths) / (len(self.base_paths)) + group_size = group_size / link_virtual + node_offset = group_size * (node_index * local_virtual_partitions) + node_offset += group_size * link_index has_data = True - if link_offset < node_index: + if link_node < node_index: has_data = False - # Make links - data_paths.sort() - for data_index, data_path in enumerate(data_paths): - if has_data and node_offset <= data_index and data_index < node_offset + group_size: - scheme.append([data_index, link_index, data_path, link_path]) - scheme.append([-1, link_index, "", link_path]) + for date_node, data_disk, data_virtual, data_index, data_path in data_schemes: + if has_data and data_disk == link_disk and node_offset <= data_index and data_index < node_offset + group_size: + scheme.append([link_disk, data_index, link_index, data_path, link_path]) + scheme.append([link_disk, -1, link_index, "", link_path]) return scheme def build_data_links(self, xml_save_path): @@ -156,8 +156,9 @@ class WeatherBenchmark: scheme = self.get_local_partition_scheme(test, xml_save_path, i) self.build_data_links_scheme(scheme) elif test in self.BENCHMARK_CLUSTER_TESTS: - scheme = self.get_cluster_partition_scheme(test, xml_save_path) - self.build_data_links_scheme(scheme) + for i in self.partitions: + scheme = self.get_cluster_partition_scheme(test, xml_save_path, i) + self.build_data_links_scheme(scheme) else: print "Unknown test." exit() @@ -207,13 +208,14 @@ class WeatherBenchmark: '''Determine the data_link path for cluster query files and copy with new location for collection.''' partitions = self.dataset.get_partitions()[0] - for i in range(len(self.nodes)): - query_path = get_cluster_query_path(self.base_paths, test, i) - prepare_path(query_path, reset) - - # Copy query files. - partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test) - self.copy_and_replace_query(query_path, partition_paths) + for i in self.partitions: + for j in range(len(self.nodes)): + query_path = get_cluster_query_path(self.base_paths, test, i, j) + prepare_path(query_path, reset) + + # Copy query files. + partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test) + self.copy_and_replace_query(query_path, partition_paths) def copy_local_query_files(self, test, reset): '''Determine the data_link path for local query files and copy with @@ -223,7 +225,7 @@ class WeatherBenchmark: prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(i, self.base_paths, "data_links/" + test) + partition_paths = get_partition_paths(0, i, self.base_paths, "data_links/" + test) self.copy_and_replace_query(query_path, partition_paths) def copy_and_replace_query(self, query_path, replacement_list): @@ -255,34 +257,26 @@ class WeatherBenchmark: print "Unknown test." exit() -def get_cluster_link_paths(nodes, base_paths, key="partitions"): +def get_cluster_link_scheme(nodes, partition, base_paths, key="partitions"): link_paths = [] for i in range(0, nodes): - new_link_path = get_cluster_link_paths_for_node(i, base_paths, key) + new_link_path = get_partition_scheme(i, partition, base_paths, key + "/" + str(i) + "nodes") link_paths.extend(new_link_path) return link_paths -def get_cluster_link_paths_for_node(node_id, base_paths, key="partitions"): - link_paths = [] - for j in range(0, len(base_paths)): - new_link_path = base_paths[j] + key + "/" + str(node_id) + "nodes/" - link_paths.append(new_link_path) - return link_paths - def get_local_query_path(base_paths, test, partition): return base_paths[0] + "queries/" + test + "/" + get_local_query_folder(len(base_paths), partition) + "/" def get_local_query_folder(disks, partitions): return "d" + str(disks) + "_p" + str(partitions) -def get_cluster_query_path(base_paths, test, nodes): - return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/" +def get_cluster_query_path(base_paths, test, partition, nodes): + return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/" + get_local_query_folder(len(base_paths), partition) + "/" def get_cluster_virtual_partitions(nodes, partitions): - if len(partitions) != 1: - print "Cluster configurations must only have one partition." - exit() - return calculate_partitions(range(len(nodes), 0, -1)) + vp = get_local_virtual_partitions(partitions) + vn = calculate_partitions(range(len(nodes), 0, -1)) + return vp * vn def get_local_virtual_partitions(partitions): return calculate_partitions(partitions) http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ea297e72/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index a68cba1..64e19d6 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -99,7 +99,7 @@ class WeatherDataFiles: # Initialize the partition paths. partition_sizes = [] - partition_paths = get_partition_paths(partitions, base_paths) + partition_paths = get_partition_paths(0, partitions, base_paths) for path in partition_paths: partition_sizes.append(0) # Make sure the xml folder is available. @@ -319,24 +319,25 @@ class WeatherDataFiles: # Index values of each field details. -PARTITION_INDEX_DISK = 0 -PARTITION_INDEX_VIRTUAL = 1 -PARTITION_INDEX = 2 -PARTITION_INDEX_PATH = 3 -PARTITION_HEADER = ("Disk", "Virtual", "Index", "Path") +PARTITION_INDEX_NODE = 0 +PARTITION_INDEX_DISK = 1 +PARTITION_INDEX_VIRTUAL = 2 +PARTITION_INDEX = 3 +PARTITION_INDEX_PATH = 4 +PARTITION_HEADER = ("Node", "Disk", "Virtual", "Index", "Path") -def get_partition_paths(partitions, base_paths, key="partitions"): +def get_partition_paths(node_id, partitions, base_paths, key="partitions"): partition_paths = [] - for scheme in get_partition_scheme(partitions, base_paths, key): + for scheme in get_partition_scheme(node_id, partitions, base_paths, key): partition_paths.append(scheme[PARTITION_INDEX_PATH]) return partition_paths -def get_partition_scheme(partitions, base_paths, key="partitions"): +def get_partition_scheme(node_id, partitions, base_paths, key="partitions"): partition_scheme = [] for i in range(0, partitions): for j in range(0, len(base_paths)): new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/" - partition_scheme.append((j, partitions, i, new_partition_path)) + partition_scheme.append((node_id, j, partitions, i, new_partition_path)) return partition_scheme def get_partition_folder(disks, partitions, index):
