Repository: incubator-vxquery Updated Branches: refs/heads/prestonc/benchmark [created] cf09bd9ae
Added new support queries and removed unused parameter. Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/49ce42cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/49ce42cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/49ce42cb Branch: refs/heads/prestonc/benchmark Commit: 49ce42cb2825eab4c6d683d61a44e05c9758d2cd Parents: fe6e05b Author: Preston Carman <[email protected]> Authored: Tue Apr 22 16:37:23 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Thu May 8 14:15:34 2014 -0700 ---------------------------------------------------------------------- .../scripts/weather_benchmark.py | 85 ++++++++++++++++---- .../noaa-ghcn-daily/scripts/weather_cli.py | 6 +- 2 files changed, 74 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/49ce42cb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index 2e04764..dead516 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -58,7 +58,10 @@ class WeatherBenchmark: "q06_station.xq", "q07_join_count.xq", "q07_tmin.xq", - "q07_tmax.xq" + "q07_tmin_values.xq", + "q07_tmin_self.xq", + "q07_tmax.xq", + "q07_tmax_values.xq" ] BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] @@ -72,30 +75,30 @@ class WeatherBenchmark: self.dataset = dataset self.nodes = nodes - def print_partition_scheme(self, xml_save_path): + def print_partition_scheme(self): if (len(self.base_paths) == 0): return for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: - self.print_local_partition_schemes(test, xml_save_path) + self.print_local_partition_schemes(test) elif test in self.BENCHMARK_CLUSTER_TESTS: - self.print_cluster_partition_schemes(test, xml_save_path) + self.print_cluster_partition_schemes(test) else: print "Unknown test." exit() - def print_local_partition_schemes(self, test, xml_save_path): + def print_local_partition_schemes(self, test): node_index = 0 virtual_partitions = get_local_virtual_partitions(self.partitions) for p in self.partitions: - scheme = self.get_local_partition_scheme(test, xml_save_path, p) + scheme = self.get_local_partition_scheme(test, p) self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index) - def print_cluster_partition_schemes(self, test, xml_save_path): + def print_cluster_partition_schemes(self, test): node_index = self.get_current_node_index() virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) for p in self.partitions: - scheme = self.get_cluster_partition_scheme(test, xml_save_path, p) + scheme = self.get_cluster_partition_scheme(test, p) self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index) def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id): @@ -118,7 +121,7 @@ class WeatherBenchmark: else: print " Scheme is EMPTY." - def get_local_partition_scheme(self, test, xml_save_path, partition): + def get_local_partition_scheme(self, test, partition): scheme = [] virtual_partitions = get_local_virtual_partitions(self.partitions) data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths) @@ -141,7 +144,7 @@ class WeatherBenchmark: offset += group_size return scheme - def get_cluster_partition_scheme(self, test, xml_save_path, partition): + def get_cluster_partition_scheme(self, test, partition): node_index = self.get_current_node_index() if node_index == -1: print "Unknown host." @@ -169,6 +172,7 @@ class WeatherBenchmark: has_data = True if link_node < node_index: has_data = False + # Make links for date_node, data_disk, data_virtual, data_index, data_path in data_schemes: if has_data and data_disk == link_disk \ @@ -177,17 +181,23 @@ class WeatherBenchmark: scheme.append([link_disk, -1, link_index, "", link_path]) return scheme - def build_data_links(self, xml_save_path): + def build_data_links(self): if (len(self.base_paths) == 0): return for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: + if 1 in self.partitions and len(self.base_paths) > 1: + scheme = self.build_data_links_local_zero_partition(test) + self.build_data_links_scheme(scheme) for i in self.partitions: - scheme = self.get_local_partition_scheme(test, xml_save_path, i) + scheme = self.get_local_partition_scheme(test, i) self.build_data_links_scheme(scheme) elif test in self.BENCHMARK_CLUSTER_TESTS: + if 1 in self.partitions and len(self.base_paths) > 1: + scheme = self.build_data_links_cluster_zero_partition(test) + self.build_data_links_scheme(scheme) for i in self.partitions: - scheme = self.get_cluster_partition_scheme(test, xml_save_path, i) + scheme = self.get_cluster_partition_scheme(test, i) self.build_data_links_scheme(scheme) else: print "Unknown test." @@ -202,6 +212,39 @@ class WeatherBenchmark: link_path_cleared.append(link_path) self.add_collection_links_for(data_path, link_path, data_index) + def build_data_links_cluster_zero_partition(self, test): + """Build a scheme for all data in one symbolically linked folder. (0 partition)""" + scheme = [] + index = 0 + current_node = 0 + link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, self.base_paths, "data_links/" + test) + for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: + new_link_path = self.get_zero_partition_path(link_node, "data_links/" + test + "/" + str(link_node) + "nodes") + scheme.append([0, index, 0, link_path, new_link_path]) + if current_node is not link_node: + current_node = link_node + index = 0 + else: + index += 1 + return scheme + + def build_data_links_local_zero_partition(self, test): + """Build a scheme for all data in one symbolically linked folder. (0 partition)""" + scheme = [] + index = 0 + link_base_schemes = get_partition_scheme(0, 1, self.base_paths, "data_links/" + test) + for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: + new_link_path = self.get_zero_partition_path(link_node, "data_links/" + test) + scheme.append([0, index, 0, link_path, new_link_path]) + index += 1 + return scheme + + def get_zero_partition_path(self, node, key): + """Return a partition path for the zero partition.""" + base_path = self.base_paths[0] + new_link_path = get_partition_scheme(node, 1, [base_path], key)[0][PARTITION_INDEX_PATH] + return new_link_path.replace("p1", "p0") + def get_current_node_index(self): found = False node_index = 0 @@ -237,7 +280,14 @@ class WeatherBenchmark: def copy_cluster_query_files(self, test, reset): '''Determine the data_link path for cluster query files and copy with new location for collection.''' - partitions = self.dataset.get_partitions()[0] + if 1 in self.partitions and len(self.base_paths) > 1: + for n in range(len(self.nodes)): + query_path = get_cluster_query_path(self.base_paths, test, 0, n) + prepare_path(query_path, reset) + + # Copy query files. + new_link_path = self.get_zero_partition_path(n, "data_links/" + test + "/" + str(n) + "nodes") + self.copy_and_replace_query(query_path, [new_link_path]) for n in range(len(self.nodes)): for p in self.partitions: query_path = get_cluster_query_path(self.base_paths, test, p, n) @@ -250,6 +300,13 @@ class WeatherBenchmark: def copy_local_query_files(self, test, reset): '''Determine the data_link path for local query files and copy with new location for collection.''' + if 1 in self.partitions and len(self.base_paths) > 1: + query_path = get_local_query_path(self.base_paths, test, 0) + prepare_path(query_path, reset) + + # Copy query files. + new_link_path = self.get_zero_partition_path(0, "data_links/" + test) + self.copy_and_replace_query(query_path, [new_link_path]) for p in self.partitions: query_path = get_local_query_path(self.base_paths, test, p) prepare_path(query_path, reset) http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/49ce42cb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py index 5bfa698..c18ac43 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py @@ -207,15 +207,15 @@ def main(argv): print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').' data.reset() if section == "partition_scheme": - benchmark.print_partition_scheme(xml_data_save_path) + benchmark.print_partition_scheme() else: data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) if section in ("all", "test_links"): # TODO determine current node print 'Processing the test links section (' + dataset.get_name() + ').' - benchmark.print_partition_scheme(xml_data_save_path) - benchmark.build_data_links(xml_data_save_path) + benchmark.print_partition_scheme() + benchmark.build_data_links() if section in ("all", "queries"): print 'Processing the queries section (' + dataset.get_name() + ').'
