Repository: incubator-vxquery
Updated Branches:
  refs/heads/master 9e2360836 -> d6c52f7c5


Added new support queries and removed unused parameter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/49ce42cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/49ce42cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/49ce42cb

Branch: refs/heads/master
Commit: 49ce42cb2825eab4c6d683d61a44e05c9758d2cd
Parents: fe6e05b
Author: Preston Carman <[email protected]>
Authored: Tue Apr 22 16:37:23 2014 -0700
Committer: Preston Carman <[email protected]>
Committed: Thu May 8 14:15:34 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                | 85 ++++++++++++++++----
 .../noaa-ghcn-daily/scripts/weather_cli.py      |  6 +-
 2 files changed, 74 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/49ce42cb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 2e04764..dead516 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -58,7 +58,10 @@ class WeatherBenchmark:
                           "q06_station.xq",
                           "q07_join_count.xq",
                           "q07_tmin.xq",
-                          "q07_tmax.xq"
+                          "q07_tmin_values.xq",
+                          "q07_tmin_self.xq",
+                          "q07_tmax.xq",
+                          "q07_tmax_values.xq"
                           ] 
     BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] 
     BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] 
@@ -72,30 +75,30 @@ class WeatherBenchmark:
         self.dataset = dataset
         self.nodes = nodes
         
-    def print_partition_scheme(self, xml_save_path):
+    def print_partition_scheme(self):
         if (len(self.base_paths) == 0):
             return
         for test in self.dataset.get_tests():
             if test in self.BENCHMARK_LOCAL_TESTS:
-                self.print_local_partition_schemes(test, xml_save_path)
+                self.print_local_partition_schemes(test)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
-                self.print_cluster_partition_schemes(test, xml_save_path)
+                self.print_cluster_partition_schemes(test)
             else:
                 print "Unknown test."
                 exit()
             
-    def print_local_partition_schemes(self, test, xml_save_path):
+    def print_local_partition_schemes(self, test):
         node_index = 0
         virtual_partitions = get_local_virtual_partitions(self.partitions)
         for p in self.partitions:
-            scheme = self.get_local_partition_scheme(test, xml_save_path, p)
+            scheme = self.get_local_partition_scheme(test, p)
             self.print_partition_schemes(virtual_partitions, scheme, test, p, 
node_index)
         
-    def print_cluster_partition_schemes(self, test, xml_save_path):
+    def print_cluster_partition_schemes(self, test):
         node_index = self.get_current_node_index()
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, 
self.partitions)
         for p in self.partitions:
-            scheme = self.get_cluster_partition_scheme(test, xml_save_path, p)
+            scheme = self.get_cluster_partition_scheme(test, p)
             self.print_partition_schemes(virtual_partitions, scheme, test, p, 
node_index)
         
     def print_partition_schemes(self, virtual_partitions, scheme, test, 
partitions, node_id):
@@ -118,7 +121,7 @@ class WeatherBenchmark:
         else:
             print "    Scheme is EMPTY."
 
-    def get_local_partition_scheme(self, test, xml_save_path, partition):
+    def get_local_partition_scheme(self, test, partition):
         scheme = []
         virtual_partitions = get_local_virtual_partitions(self.partitions)
         data_schemes = get_partition_scheme(0, virtual_partitions, 
self.base_paths)
@@ -141,7 +144,7 @@ class WeatherBenchmark:
                     offset += group_size
         return scheme
     
-    def get_cluster_partition_scheme(self, test, xml_save_path, partition):
+    def get_cluster_partition_scheme(self, test, partition):
         node_index = self.get_current_node_index()
         if node_index == -1:
             print "Unknown host."
@@ -169,6 +172,7 @@ class WeatherBenchmark:
             has_data = True
             if link_node < node_index:
                 has_data = False
+                
             # Make links
             for date_node, data_disk, data_virtual, data_index, data_path in 
data_schemes:
                 if has_data and data_disk == link_disk \
@@ -177,17 +181,23 @@ class WeatherBenchmark:
             scheme.append([link_disk, -1, link_index, "", link_path])
         return scheme
     
-    def build_data_links(self, xml_save_path):
+    def build_data_links(self):
         if (len(self.base_paths) == 0):
             return
         for test in self.dataset.get_tests():
             if test in self.BENCHMARK_LOCAL_TESTS:
+                if 1 in self.partitions and len(self.base_paths) > 1:
+                    scheme = self.build_data_links_local_zero_partition(test)
+                    self.build_data_links_scheme(scheme)
                 for i in self.partitions:
-                    scheme = self.get_local_partition_scheme(test, 
xml_save_path, i)
+                    scheme = self.get_local_partition_scheme(test, i)
                     self.build_data_links_scheme(scheme)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
+                if 1 in self.partitions and len(self.base_paths) > 1:
+                    scheme = self.build_data_links_cluster_zero_partition(test)
+                    self.build_data_links_scheme(scheme)
                 for i in self.partitions:
-                    scheme = self.get_cluster_partition_scheme(test, 
xml_save_path, i)
+                    scheme = self.get_cluster_partition_scheme(test, i)
                     self.build_data_links_scheme(scheme)
             else:
                 print "Unknown test."
@@ -202,6 +212,39 @@ class WeatherBenchmark:
                 link_path_cleared.append(link_path)
             self.add_collection_links_for(data_path, link_path, data_index)
     
+    def build_data_links_cluster_zero_partition(self, test):
+        """Build a scheme for all data in one symbolically linked folder. (0 
partition)"""
+        scheme = []
+        index = 0
+        current_node = 0
+        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, 
self.base_paths, "data_links/" + test)
+        for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
+            new_link_path = self.get_zero_partition_path(link_node, 
"data_links/" + test + "/" + str(link_node) + "nodes")
+            scheme.append([0, index, 0, link_path, new_link_path])
+            if current_node is not link_node:
+                current_node = link_node
+                index = 0
+            else:
+                index += 1
+        return scheme
+
+    def build_data_links_local_zero_partition(self, test):
+        """Build a scheme for all data in one symbolically linked folder. (0 
partition)"""
+        scheme = []
+        index = 0
+        link_base_schemes = get_partition_scheme(0, 1, self.base_paths, 
"data_links/" + test)
+        for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
+            new_link_path = self.get_zero_partition_path(link_node, 
"data_links/" + test)
+            scheme.append([0, index, 0, link_path, new_link_path])
+            index += 1
+        return scheme
+
+    def get_zero_partition_path(self, node, key):
+        """Return a partition path for the zero partition."""
+        base_path = self.base_paths[0]
+        new_link_path = get_partition_scheme(node, 1, [base_path], 
key)[0][PARTITION_INDEX_PATH]
+        return new_link_path.replace("p1", "p0")
+        
     def get_current_node_index(self):
         found = False
         node_index = 0
@@ -237,7 +280,14 @@ class WeatherBenchmark:
     def copy_cluster_query_files(self, test, reset):
         '''Determine the data_link path for cluster query files and copy with
         new location for collection.'''
-        partitions = self.dataset.get_partitions()[0]
+        if 1 in self.partitions and len(self.base_paths) > 1:
+            for n in range(len(self.nodes)):
+                query_path = get_cluster_query_path(self.base_paths, test, 0, 
n)
+                prepare_path(query_path, reset)
+            
+                # Copy query files.
+                new_link_path = self.get_zero_partition_path(n, "data_links/" 
+ test + "/" + str(n) + "nodes")
+                self.copy_and_replace_query(query_path, [new_link_path])
         for n in range(len(self.nodes)):
             for p in self.partitions:
                 query_path = get_cluster_query_path(self.base_paths, test, p, 
n)
@@ -250,6 +300,13 @@ class WeatherBenchmark:
     def copy_local_query_files(self, test, reset):
         '''Determine the data_link path for local query files and copy with
         new location for collection.'''
+        if 1 in self.partitions and len(self.base_paths) > 1:
+            query_path = get_local_query_path(self.base_paths, test, 0)
+            prepare_path(query_path, reset)
+    
+            # Copy query files.
+            new_link_path = self.get_zero_partition_path(0, "data_links/" + 
test)
+            self.copy_and_replace_query(query_path, [new_link_path])
         for p in self.partitions:
             query_path = get_local_query_path(self.base_paths, test, p)
             prepare_path(query_path, reset)

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/49ce42cb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index 5bfa698..c18ac43 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -207,15 +207,15 @@ def main(argv):
             print 'Processing the partition section (' + dataset.get_name() + 
':d' + str(len(base_paths)) + ':s' + str(slices) + ').'
             data.reset()
             if section == "partition_scheme":
-                benchmark.print_partition_scheme(xml_data_save_path)
+                benchmark.print_partition_scheme()
             else:
                 data.copy_to_n_partitions(xml_data_save_path, slices, 
base_paths, reset)
     
         if section in ("all", "test_links"):
             # TODO determine current node 
             print 'Processing the test links section (' + dataset.get_name() + 
').'
-            benchmark.print_partition_scheme(xml_data_save_path)
-            benchmark.build_data_links(xml_data_save_path)
+            benchmark.print_partition_scheme()
+            benchmark.build_data_links()
 
         if section in ("all", "queries"):
             print 'Processing the queries section (' + dataset.get_name() + 
').'

Reply via email to