A collection of changes to the benchmark scripts.

- Adding more logging.
- Creates partitioned files instead of monthly files.
- Clean up unused code.
- Added a script for MRQL tests.


Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/481f9edb
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/481f9edb
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/481f9edb

Branch: refs/heads/site
Commit: 481f9edbd118e202671915a2605cb5540ad98723
Parents: d76e4ed
Author: Preston Carman <[email protected]>
Authored: Fri Jun 27 14:26:25 2014 -0700
Committer: Preston Carman <[email protected]>
Committed: Fri Jun 27 14:26:25 2014 -0700

----------------------------------------------------------------------
 .../noaa-ghcn-daily/scripts/run_benchmark.sh    |   7 +-
 .../scripts/run_benchmark_cluster.sh            |  59 +++---
 .../noaa-ghcn-daily/scripts/run_mrql_test.sh    |  29 +++
 .../scripts/weather_benchmark.py                | 138 ++++++++++---
 .../noaa-ghcn-daily/scripts/weather_cli.py      |  11 +-
 .../noaa-ghcn-daily/scripts/weather_config.py   |  12 +-
 .../scripts/weather_convert_to_xml.py           | 200 +------------------
 .../scripts/weather_data_files.py               | 107 +++++-----
 8 files changed, 255 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh
index 2dd070c..b2b1531 100755
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh
@@ -25,6 +25,8 @@
 # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ 
"-client-net-ip-address 169.235.27.138"
 # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "" q03
 #
+REPEAT=5
+FRAME_SIZE=10000
 
 if [ -z "${1}" ]
 then
@@ -32,15 +34,18 @@ then
     exit
 fi
 
+export JAVA_OPTS="$JAVA_OPTS -server -Xmx8G -XX:+HeapDumpOnOutOfMemoryError 
-Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/benchmark_logging.properties"
+
 for j in $(find ${1} -name '*q??.xq')
 do
     if [ -z "${3}" ] || [[ "${j}" =~ "${3}" ]] 
     then
+       date
         echo "Running query: ${j}"
         log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log"
         log_base_path=$(dirname ${j/queries/query_logs})
         mkdir -p ${log_base_path}
-       time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing 
-showquery -showoet -showrp -frame-size 10000 -repeatexec 10 > 
${log_base_path}/${log_file} 2>&1
+        time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing 
-showquery -showoet -showrp -frame-size ${FRAME_SIZE} -repeatexec ${REPEAT} > 
${log_base_path}/${log_file} 2>&1
     fi;
 done
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh
index a77f3c2..f868024 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh
@@ -25,8 +25,8 @@
 # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ 
"-client-net-ip-address 169.235.27.138"
 # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "" q03
 #
-
-CLUSTER_COUNT=5
+REPEAT=5
+FRAME_SIZE=10000
 
 if [ -z "${1}" ]
 then
@@ -34,30 +34,39 @@ then
     exit
 fi
 
-# Run queries for each number of nodes.
-for (( i = 0; i < ${CLUSTER_COUNT}; i++ ))
-do 
-    echo "Starting ${i} cluster nodes"
-    python vxquery-server/src/main/resources/scripts/cluster_cli.py -c 
vxquery-server/src/main/resources/conf/${i}nodes.xml -a start
-    
-    for j in $(find ${1} -name '*q??.xq')
-    do
-        # Only work with i nodes.
-        if [[ "${j}" =~ "${i}nodes" ]]
+if [ -z "${2}" ]
+then
+    echo "Please the number of nodes (start at 0)."
+    exit
+fi
+
+# Run queries for the specified number of nodes.
+echo "Starting ${2} cluster nodes"
+python vxquery-server/src/main/resources/scripts/cluster_cli.py -c 
vxquery-server/src/main/resources/conf/${2}nodes.xml -a start
+
+# wait for cluster to finish setting up  
+sleep 5
+
+export JAVA_OPTS="$JAVA_OPTS -server -Xmx8G -XX:+HeapDumpOnOutOfMemoryError 
-Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/benchmark_logging.properties"
+
+for j in $(find ${1} -name '*q??.xq')
+do
+    # Only work with i nodes.
+    if [[ "${j}" =~ "${2}nodes" ]]
+    then
+        # Only run for specified queries.
+        if [ -z "${4}" ] || [[ "${j}" =~ "${4}" ]]
         then
-            # Only run for specified queries.
-            if [ -z "${3}" ] || [[ "${j}" =~ "${3}" ]]
-            then
-                echo "Running query: ${j}"
-                log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log"
-                log_base_path=$(dirname ${j/queries/query_logs})
-                mkdir -p ${log_base_path}
-                time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} 
-timing -showquery -showoet -showrp -frame-size 10000 -repeatexec 10 > 
${log_base_path}/${log_file} 2>&1
-            fi;
+               date
+            echo "Running query: ${j}"
+            log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log"
+            log_base_path=$(dirname ${j/queries/query_logs})
+            mkdir -p ${log_base_path}
+            time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${3} 
-timing -showquery -showoet -showrp -frame-size ${FRAME_SIZE} -repeatexec 
${REPEAT} > ${log_base_path}/${log_file} 2>&1
         fi;
-    done
-    
-    # Stop cluster.
-    python vxquery-server/src/main/resources/scripts/cluster_cli.py -c 
vxquery-server/src/main/resources/conf/${i}nodes.xml -a stop
+    fi;
 done
+    
+# Stop cluster.
+python vxquery-server/src/main/resources/scripts/cluster_cli.py -c 
vxquery-server/src/main/resources/conf/${2}nodes.xml -a stop
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh
new file mode 100644
index 0000000..dd25c01
--- /dev/null
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export JAVA_HOME=/home/ecarm002/java/jdk1.6.0_45
+REPEAT=${1}
+
+#for n in `seq 0 7`
+for n in 6
+do
+       date
+       echo "Running q0${n} for MRQL."
+       time for i in {1..${REPEAT}}; do ~/mrql/incubator-mrql/bin/mrql -dist 
-nodes 5 
~/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/other_systems/mrql/q0${n}.mrql
 > weather_data/mrql/query_logs/gsn/q0${n}.mrql.log 2>&1; done; 
+done

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 081f80a..f3c9e68 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -32,10 +32,39 @@ from weather_data_files import *
 #   logs/
 class WeatherBenchmark:
 
+    DATA_LINKS_FOLDER = "data_links/"
+    LARGE_FILE_ROOT_TAG = WeatherDataFiles.LARGE_FILE_ROOT_TAG
     QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/"
     QUERY_MASTER_FOLDER = "../queries/"
-    QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", 
"q05.xq", "q06.xq", "q07.xq"] 
-    QUERY_UTILITY_LIST = ["sensor_count.xq", "station_count.xq", 
"q04_sensor.xq", "q04_station.xq", "q05_sensor.xq", "q05_station.xq", 
"q06_sensor.xq", "q06_station.xq", "q07_tmin.xq", "q07_tmax.xq"] 
+    QUERY_FILE_LIST = [
+                       "q00.xq",
+                       "q01.xq",
+                       "q02.xq",
+                       "q03.xq",
+                       "q04.xq",
+                       "q05.xq",
+                       "q06.xq",
+                       "q07.xq"
+                       ] 
+    QUERY_UTILITY_LIST = [
+                          "sensor_count.xq",
+                          "station_count.xq",
+                          "q04_join_count.xq",
+                          "q04_sensor.xq",
+                          "q04_station.xq",
+                          "q05_join_count.xq",
+                          "q05_sensor.xq",
+                          "q05_station.xq",
+                          "q06_join_count.xq",
+                          "q06_sensor.xq",
+                          "q06_station.xq",
+                          "q07_join_count.xq",
+                          "q07_tmin.xq",
+                          "q07_tmin_values.xq",
+                          "q07_tmin_self.xq",
+                          "q07_tmax.xq",
+                          "q07_tmax_values.xq"
+                          ] 
     BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] 
     BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] 
     QUERY_COLLECTIONS = ["sensors", "stations"]
@@ -48,30 +77,30 @@ class WeatherBenchmark:
         self.dataset = dataset
         self.nodes = nodes
         
-    def print_partition_scheme(self, xml_save_path):
+    def print_partition_scheme(self):
         if (len(self.base_paths) == 0):
             return
         for test in self.dataset.get_tests():
             if test in self.BENCHMARK_LOCAL_TESTS:
-                self.print_local_partition_schemes(test, xml_save_path)
+                self.print_local_partition_schemes(test)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
-                self.print_cluster_partition_schemes(test, xml_save_path)
+                self.print_cluster_partition_schemes(test)
             else:
                 print "Unknown test."
                 exit()
             
-    def print_local_partition_schemes(self, test, xml_save_path):
+    def print_local_partition_schemes(self, test):
         node_index = 0
         virtual_partitions = get_local_virtual_partitions(self.partitions)
         for p in self.partitions:
-            scheme = self.get_local_partition_scheme(test, xml_save_path, p)
+            scheme = self.get_local_partition_scheme(test, p)
             self.print_partition_schemes(virtual_partitions, scheme, test, p, 
node_index)
         
-    def print_cluster_partition_schemes(self, test, xml_save_path):
+    def print_cluster_partition_schemes(self, test):
         node_index = self.get_current_node_index()
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, 
self.partitions)
         for p in self.partitions:
-            scheme = self.get_cluster_partition_scheme(test, xml_save_path, p)
+            scheme = self.get_cluster_partition_scheme(test, p)
             self.print_partition_schemes(virtual_partitions, scheme, test, p, 
node_index)
         
     def print_partition_schemes(self, virtual_partitions, scheme, test, 
partitions, node_id):
@@ -94,11 +123,11 @@ class WeatherBenchmark:
         else:
             print "    Scheme is EMPTY."
 
-    def get_local_partition_scheme(self, test, xml_save_path, partition):
+    def get_local_partition_scheme(self, test, partition):
         scheme = []
         virtual_partitions = get_local_virtual_partitions(self.partitions)
         data_schemes = get_partition_scheme(0, virtual_partitions, 
self.base_paths)
-        link_base_schemes = get_partition_scheme(0, partition, 
self.base_paths, "data_links/" + test)
+        link_base_schemes = get_partition_scheme(0, partition, 
self.base_paths, self.DATA_LINKS_FOLDER + test)
 
         # Match link paths to real data paths.
         group_size = len(data_schemes) / len(link_base_schemes)
@@ -117,7 +146,7 @@ class WeatherBenchmark:
                     offset += group_size
         return scheme
     
-    def get_cluster_partition_scheme(self, test, xml_save_path, partition):
+    def get_cluster_partition_scheme(self, test, partition):
         node_index = self.get_current_node_index()
         if node_index == -1:
             print "Unknown host."
@@ -127,7 +156,7 @@ class WeatherBenchmark:
         local_virtual_partitions = 
get_local_virtual_partitions(self.partitions)
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, 
self.partitions)
         data_schemes = get_partition_scheme(node_index, virtual_partitions, 
self.base_paths)
-        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 
partition, self.base_paths, "data_links/" + test)
+        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 
partition, self.base_paths, self.DATA_LINKS_FOLDER + test)
 
         # Match link paths to real data paths.
         for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
@@ -145,6 +174,7 @@ class WeatherBenchmark:
             has_data = True
             if link_node < node_index:
                 has_data = False
+                
             # Make links
             for date_node, data_disk, data_virtual, data_index, data_path in 
data_schemes:
                 if has_data and data_disk == link_disk \
@@ -153,36 +183,68 @@ class WeatherBenchmark:
             scheme.append([link_disk, -1, link_index, "", link_path])
         return scheme
     
-    def build_data_links(self, xml_save_path):
+    def build_data_links(self, reset):
         if (len(self.base_paths) == 0):
             return
+        if reset:
+            shutil.rmtree(self.base_paths[0] + self.DATA_LINKS_FOLDER)
         for test in self.dataset.get_tests():
             if test in self.BENCHMARK_LOCAL_TESTS:
                 for i in self.partitions:
-                    scheme = self.get_local_partition_scheme(test, 
xml_save_path, i)
+                    scheme = self.get_local_partition_scheme(test, i)
+                    self.build_data_links_scheme(scheme)
+                if 1 in self.partitions and len(self.base_paths) > 1:
+                    scheme = self.build_data_links_local_zero_partition(test)
                     self.build_data_links_scheme(scheme)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
                 for i in self.partitions:
-                    scheme = self.get_cluster_partition_scheme(test, 
xml_save_path, i)
+                    scheme = self.get_cluster_partition_scheme(test, i)
+                    self.build_data_links_scheme(scheme)
+                if 1 in self.partitions and len(self.base_paths) > 1:
+                    scheme = self.build_data_links_cluster_zero_partition(test)
                     self.build_data_links_scheme(scheme)
             else:
                 print "Unknown test."
                 exit()
     
     def build_data_links_scheme(self, scheme):
-        """Build all the data links based on the scheme information."""
-        link_path_cleared = []
+        '''Build all the data links based on the scheme information.'''
         for (data_disk, data_index, partition, data_path, link_path) in scheme:
-            if link_path not in link_path_cleared and os.path.isdir(link_path):
-                shutil.rmtree(link_path)
-                link_path_cleared.append(link_path)
             self.add_collection_links_for(data_path, link_path, data_index)
     
+    def build_data_links_cluster_zero_partition(self, test):
+        '''Build a scheme for all data in one symbolically linked folder. (0 
partition)'''
+        scheme = []
+        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, 
self.base_paths, self.DATA_LINKS_FOLDER + test)
+        for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
+            new_link_path = self.get_zero_partition_path(link_node, 
self.DATA_LINKS_FOLDER + test + "/" + str(link_node) + "nodes")
+            scheme.append([0, link_disk, 0, link_path, new_link_path])
+        return scheme
+
+    def build_data_links_local_zero_partition(self, test):
+        '''Build a scheme for all data in one symbolically linked folder. (0 
partition)'''
+        scheme = []
+        index = 0
+        link_base_schemes = get_partition_scheme(0, 1, self.base_paths, 
self.DATA_LINKS_FOLDER + test)
+        for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
+            if test == "local_batch_scale_out" and index > 0:
+                continue
+            new_link_path = self.get_zero_partition_path(link_node, 
self.DATA_LINKS_FOLDER + test)
+            scheme.append([0, index, 0, link_path, new_link_path])
+            index += 1
+        return scheme
+
+    def get_zero_partition_path(self, node, key):
+        '''Return a partition path for the zero partition.'''
+        base_path = self.base_paths[0]
+        new_link_path = get_partition_scheme(node, 1, [base_path], 
key)[0][PARTITION_INDEX_PATH]
+        return new_link_path.replace("p1", "p0")
+        
     def get_current_node_index(self):
         found = False
         node_index = 0
         for machine in self.nodes:
-            if socket.gethostname() == machine.get_node_name():
+            if socket.gethostname().startswith(machine.get_node_name()):
                 found = True
                 break
             node_index += 1
@@ -195,10 +257,13 @@ class WeatherBenchmark:
     def add_collection_links_for(self, real_path, link_path, index):
         for collection in self.QUERY_COLLECTIONS:
             collection_path = link_path + collection + "/"
+            collection_index = collection_path + "index" + str(index)
             if not os.path.isdir(collection_path):
                 os.makedirs(collection_path)
             if index >= 0:
-                os.symlink(real_path + collection + "/", collection_path + 
"index" + str(index))
+                if os.path.islink(collection_index):
+                    os.unlink(collection_index)
+                os.symlink(real_path + collection + "/", collection_index)
             
     def copy_query_files(self, reset):
         for test in self.dataset.get_tests():
@@ -213,25 +278,39 @@ class WeatherBenchmark:
     def copy_cluster_query_files(self, test, reset):
         '''Determine the data_link path for cluster query files and copy with
         new location for collection.'''
-        partitions = self.dataset.get_partitions()[0]
+        if 1 in self.partitions and len(self.base_paths) > 1:
+            for n in range(len(self.nodes)):
+                query_path = get_cluster_query_path(self.base_paths, test, 0, 
n)
+                prepare_path(query_path, reset)
+            
+                # Copy query files.
+                new_link_path = self.get_zero_partition_path(n, 
self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes")
+                self.copy_and_replace_query(query_path, [new_link_path])
         for n in range(len(self.nodes)):
             for p in self.partitions:
                 query_path = get_cluster_query_path(self.base_paths, test, p, 
n)
                 prepare_path(query_path, reset)
             
                 # Copy query files.
-                partition_paths = get_partition_paths(n, p, self.base_paths, 
"data_links/" + test + "/" + str(n) + "nodes")
+                partition_paths = get_partition_paths(n, p, self.base_paths, 
self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes")
                 self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_local_query_files(self, test, reset):
         '''Determine the data_link path for local query files and copy with
         new location for collection.'''
+        if 1 in self.partitions and len(self.base_paths) > 1:
+            query_path = get_local_query_path(self.base_paths, test, 0)
+            prepare_path(query_path, reset)
+    
+            # Copy query files.
+            new_link_path = self.get_zero_partition_path(0, 
self.DATA_LINKS_FOLDER + test)
+            self.copy_and_replace_query(query_path, [new_link_path])
         for p in self.partitions:
             query_path = get_local_query_path(self.base_paths, test, p)
             prepare_path(query_path, reset)
     
             # Copy query files.
-            partition_paths = get_partition_paths(0, p, self.base_paths, 
"data_links/" + test)
+            partition_paths = get_partition_paths(0, p, self.base_paths, 
self.DATA_LINKS_FOLDER + test)
             self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_and_replace_query(self, query_path, replacement_list):
@@ -250,6 +329,13 @@ class WeatherBenchmark:
                 for line in fileinput.input(query_path + query_file, True):
                     sys.stdout.write(line.replace(self.QUERY_REPLACEMENT_KEY + 
collection, replace_string))
                     
+            # Make a search replace for partition type.
+            if self.dataset.get_partition_type() == "large_files":
+                for line in fileinput.input(query_path + query_file, True):
+                    sys.stdout.write(line.replace("/stationCollection", "/" + 
self.LARGE_FILE_ROOT_TAG + "/stationCollection"))
+                for line in fileinput.input(query_path + query_file, True):
+                    sys.stdout.write(line.replace("/dataCollection", "/" + 
self.LARGE_FILE_ROOT_TAG + "/dataCollection"))
+                    
     def get_number_of_slices(self):
         if len(self.dataset.get_tests()) == 0:
             print "No test has been defined in config file."

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index 5bfa698..8ac6d17 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -207,15 +207,18 @@ def main(argv):
             print 'Processing the partition section (' + dataset.get_name() + 
':d' + str(len(base_paths)) + ':s' + str(slices) + ').'
             data.reset()
             if section == "partition_scheme":
-                benchmark.print_partition_scheme(xml_data_save_path)
+                benchmark.print_partition_scheme()
             else:
-                data.copy_to_n_partitions(xml_data_save_path, slices, 
base_paths, reset)
+                if dataset.get_partition_type() == "large_files":
+                    data.build_to_n_partition_files(xml_data_save_path, 
slices, base_paths, reset)
+                else:
+                    data.copy_to_n_partitions(xml_data_save_path, slices, 
base_paths, reset)
     
         if section in ("all", "test_links"):
             # TODO determine current node 
             print 'Processing the test links section (' + dataset.get_name() + 
').'
-            benchmark.print_partition_scheme(xml_data_save_path)
-            benchmark.build_data_links(xml_data_save_path)
+            benchmark.print_partition_scheme()
+            benchmark.build_data_links(reset)
 
         if section in ("all", "queries"):
             print 'Processing the queries section (' + dataset.get_name() + 
').'

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
index a6513c2..80607b8 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
@@ -41,9 +41,10 @@ class WeatherConfig:
         for node in self.config.getElementsByTagName("dataset"):
             name = self.get_dataset_name(node)
             save_paths = self.get_dataset_save_paths(node)
+            partition_type = self.get_dataset_partition_type(node)
             partitions = self.get_dataset_partitions(node)
             tests = self.get_dataset_tests(node)
-            nodes.append(Dataset(name, save_paths, partitions, tests))
+            nodes.append(Dataset(name, save_paths, partition_type, partitions, 
tests))
         return nodes
 
 
@@ -69,6 +70,9 @@ class WeatherConfig:
             paths.append(self.get_text(item))
         return paths
 
+    def get_dataset_partition_type(self, node):
+        return self.get_text(node.getElementsByTagName("partition_type")[0])
+
     def get_dataset_partitions(self, node):
         paths = []
         for item in node.getElementsByTagName("partitions_per_path"):
@@ -103,10 +107,11 @@ class Machine:
         return self.id + "(" + self.ip + ")"
     
 class Dataset:
-    def __init__(self, name, save_paths, partitions, tests):
+    def __init__(self, name, save_paths, partition_type, partitions, tests):
         self.name = name
         self.save_paths = save_paths
         self.partitions = partitions
+        self.partition_type = partition_type
         self.tests = tests
     
     def get_name(self):
@@ -118,6 +123,9 @@ class Dataset:
     def get_partitions(self):
         return self.partitions
     
+    def get_partition_type(self):
+        return self.partition_type
+    
     def get_tests(self):
         return self.tests
     

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
index a4f33a1..5db090a 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
@@ -150,10 +150,6 @@ class WeatherConvertToXML:
         # Default
         return 0
     
-    def process_one_day(self, records, report_date, page):
-        # Default
-        return 0
-    
     def process_station_data(self, row):
         # Default
         return 0
@@ -165,7 +161,7 @@ class WeatherConvertToXML:
         print "Processing inventory file"
         file_stream = open(self.ghcnd_inventory, 'r')
         
-        csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT',  'MAX_YEARS', 
'TOTAL_YEARS_FOR_ALL_SENSORS']
+        csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 
'TOTAL_YEARS_FOR_ALL_SENSORS']
         row = file_stream.readline()
         csv_inventory = {}
         for row in file_stream:
@@ -243,16 +239,6 @@ class WeatherConvertToXML:
     def convert_c2f(self, c):
         return (9 / 5 * c) + 32
     
-    def default_xml_start(self):
-        return textwrap.dedent("""\
-            <?xml version="1.0" encoding="ISO-8859-1"?>
-            <ghcnd_observation version="1.0"
-                 xmlns:xsd="http://www.w3.org/2001/XMLSchema";
-                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
-                <credit>NOAA's National Climatic Data Center (NCDC)</credit>
-                <credit_URL>http://www.ncdc.noaa.gov/</credit_URL>
-            """)
-    
     def default_xml_web_service_start(self):
         field_xml = ""
         field_xml += "<?xml version=\"1.0\" encoding=\"UTF-8\" 
standalone=\"yes\"?>\n"
@@ -273,76 +259,6 @@ class WeatherConvertToXML:
         field_xml += self.get_indent_space(indent) + "<date>" + 
str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + 
str(report_date.day).zfill(2) + "T00:00:00.000</date>\n"
         return field_xml
     
-    def get_date_from_field(self, row, field):
-        report_date = self.get_field_from_definition(row, field)
-        return str(report_date.year) + "-" + str(report_date.month).zfill(2) + 
"-" + str(report_date.day).zfill(2)
-    
-    def default_xml_field_date_iso8601(self, report_date):
-        field_xml = ""
-        field_xml += "    <observation_date>" + self.MONTHS[report_date.month 
- 1] + " " + str(report_date.day) + ", " + str(report_date.year) + 
"</observation_date>\n"
-        field_xml += "    <observation_date_iso8601>" + 
report_date.isoformat() + "</observation_date_iso8601>\n"
-        return field_xml
-    
-    def default_xml_field_date_year(self, year):
-        field_xml = ""
-        field_xml += "    <observation_year>" + str(year) + 
"</observation_year>\n"
-        return field_xml
-
-    def default_xml_field_date_month(self, month):
-        field_xml = ""
-        field_xml += "    <observation_month>" + str(month) + 
"</observation_month>\n"
-        return field_xml
-
-    def default_xml_field_date_day(self, day):
-        field_xml = ""
-        field_xml += "            <observation_day>" + str(day) + 
"</observation_day>\n"
-        return field_xml
-    
-    def default_xml_field_station_id(self, station_id, indent=2):
-        field_xml = ""
-        field_xml += self.get_indent_space(indent) + "<station_id>" + 
station_id + "</station_id>\n"
-        return field_xml
-    
-    def default_xml_field_station(self, station_id):
-        station_row = ""
-        stations_file = open(self.ghcnd_stations, 'r')
-        
-        for line in stations_file:
-            if station_id == self.get_field_from_definition(line, 
STATIONS_FIELDS['ID']):
-                station_row = line
-                break
-        
-        field_xml = ""
-        field_xml += "    <station_id>" + station_id + "</station_id>\n"
-        field_xml += "    <location>\n"
-        field_xml += "        <latitude>" + 
self.get_field_from_definition(station_row, 
STATIONS_FIELDS['LATITUDE']).strip() + "</latitude>\n"
-        field_xml += "        <longitude>" + 
self.get_field_from_definition(station_row, 
STATIONS_FIELDS['LONGITUDE']).strip() + "</longitude>\n"
-        
-        elevation = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['ELEVATION']).strip()
-        if elevation != "-999.9":
-            field_xml += "        <elevation>" + elevation + "</elevation>\n"
-        
-        field_xml += "    </location>\n"
-        field_xml += "    <name>" + 
self.get_field_from_definition(station_row, STATIONS_FIELDS['NAME']).strip() + 
"</name>\n"
-    
-        state = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['STATE'])
-        if state.strip():
-            field_xml += "    <state>" + state + "</state>\n"
-    
-        gsn = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['GSNFLAG'])
-        if gsn.strip():
-            field_xml += "    <gsn />\n"
-    
-        hcn = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['HCNFLAG'])
-        if hcn.strip():
-            field_xml += "    <hcn />\n"
-    
-        wmoid = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['WMOID'])
-        if wmoid.strip():
-            field_xml += "    <wmoid id=\"" + wmoid + "\" />\n"
-    
-        return field_xml
-    
     def default_xml_mshr_station_additional(self, station_id):
         """The web service station data is generate from the MSHR data 
supplemented with GHCN-Daily."""
         station_mshr_row = ""
@@ -492,120 +408,6 @@ class WeatherConvertToXML:
     def get_indent_space(self, indent):
         return (" " * (4 * indent))
     
-class WeatherDailyXMLFile(WeatherConvertToXML):
-    def process_one_month_sensor_set(self, records, page):
-        year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR))
-        month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH))
-    
-        station_id = self.get_dly_field(records[0], DLY_FIELD_ID)
-        
-        count = 0
-        for day in range(1, 32):
-            try:
-                # TODO find out what is a valid python date range? 1889?
-                # Attempt to see if this is valid date.
-                report_date = date(year, month, day)
-                save_file_name = self.process_one_day(records, report_date, 
page)
-                if save_file_name is not "":
-                    count = count + 1
-                    if self.debug_output:
-                        print "Wrote file: " + save_file_name
-            except ValueError:
-                if self.debug_output:
-                    print "Error: Not a valid date (" + str(month) + "/" + 
str(day) + "/" + str(year) + ") for " + station_id + "."
-                pass
-        return count
-
-    def process_one_day(self, records, report_date, page):
-        station_id = self.get_dly_field(records[0], DLY_FIELD_ID)
-        found_data = False
-                
-        # Information for each daily file.
-        daily_xml_file = self.default_xml_start()
-        daily_xml_file += self.default_xml_field_station(station_id)
-        daily_xml_file += self.default_xml_field_date_iso8601(report_date)
-        daily_xml_file += self.default_xml_start_tag("sensors")
-        for record in records:
-            record_xml_snip = self.default_xml_day_reading_as_field(record, 
report_date.day)
-            if record_xml_snip is not "":
-                found_data = True
-            daily_xml_file += record_xml_snip
-        daily_xml_file += self.default_xml_end_tag("sensors")
-        daily_xml_file += self.default_xml_end()
-        
-        if not found_data:
-            return ""
-        
-        # Make sure the station folder is available.
-        ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + 
station_id + "/" + str(report_date.year) + "/"
-        if not os.path.isdir(ghcnd_xml_station_path):
-            os.makedirs(ghcnd_xml_station_path)
-        
-        # Save XML string to disk.
-        save_file_name = ghcnd_xml_station_path + 
build_sensor_save_filename(station_id, report_date, page)
-        save_file_name = self.save_file(save_file_name, daily_xml_file)
-                
-        return save_file_name
-    
-class WeatherMonthlyXMLFile(WeatherConvertToXML):
-    def process_one_month_sensor_set(self, records, page):
-        found_data = False        
-        year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR))
-        month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH))
-    
-        station_id = self.get_dly_field(records[0], DLY_FIELD_ID)
-
-        # Information for each daily file.
-        daily_xml_file = self.default_xml_start()
-        daily_xml_file += self.default_xml_field_station(station_id)
-        daily_xml_file += self.default_xml_field_date_year(year)
-        daily_xml_file += self.default_xml_field_date_month(month)
-        daily_xml_file += self.default_xml_start_tag("readings")
-        
-        for day in range(1, 32):
-            try:
-                # TODO find out what is a valid python date range? 1889?
-                # Attempt to see if this is valid date.
-                report_date = date(year, month, day)
-                found_daily_data = False
-                record_xml_snip = ""
-
-                for record in records:
-                    record_xml_snip += 
self.default_xml_day_reading_as_field(record, report_date.day)
-                    if record_xml_snip is not "":
-                        found_data = True
-                        found_daily_data = True
-
-                if found_daily_data:
-                    daily_xml_file += self.default_xml_start_tag("reading", 2)
-                    daily_xml_file += self.default_xml_field_date_day(day)
-                    daily_xml_file += record_xml_snip
-                    daily_xml_file += self.default_xml_end_tag("reading", 2)
-                
-            except ValueError:
-                pass
-
-        daily_xml_file += self.default_xml_end_tag("readings")
-        daily_xml_file += self.default_xml_end()
-
-        if not found_data:
-            return 0
-
-        # Make sure the station folder is available.
-        ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + 
station_id + "/" + str(report_date.year) + "/"
-        if not os.path.isdir(ghcnd_xml_station_path):
-            os.makedirs(ghcnd_xml_station_path)
-                
-        # Save XML string to disk.
-        save_file_name = ghcnd_xml_station_path + 
build_sensor_save_filename(station_id, report_date, page)
-        save_file_name = self.save_file(save_file_name, daily_xml_file)
-
-        if save_file_name is not "":
-            if self.debug_output:
-                print "Wrote file: " + save_file_name
-            return 1
-        else:
-            return 0
 
 class WeatherWebServiceMonthlyXMLFile(WeatherConvertToXML):
     """The web service class details how to create files similar to the NOAA 
web service."""

http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index a7fb691..1c9f129 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -26,6 +26,8 @@ from collections import OrderedDict
 # Allows partition and picking up where you left off.
 class WeatherDataFiles:
 
+    LARGE_FILE_ROOT_TAG = "root"
+
     INDEX_DATA_FILE_NAME = 0
     INDEX_DATA_SENSORS_STATUS = 1
     INDEX_DATA_STATION_STATUS = 2
@@ -51,14 +53,10 @@ class WeatherDataFiles:
         self.current = self.DATA_FILE_START_INDEX
         self.progress_data = []
 
-        
-    def get_file_list(self):
-        return glob.glob(self.base_path + "/*" + self.DATA_FILE_EXTENSION)
-
     def get_file_list_iterator(self):
+        """Return the list of files one at a time."""
         return glob.iglob(self.base_path + "/*" + self.DATA_FILE_EXTENSION)
 
-
     # Save Functions
     def build_progress_file(self, options, convert):
         if not os.path.isfile(self.progress_file_name) or 'reset' in options:
@@ -98,10 +96,8 @@ class WeatherDataFiles:
             return
         
         # Initialize the partition paths.
-        partition_sizes = []
         partition_paths = get_partition_paths(0, partitions, base_paths)
         for path in partition_paths:
-            partition_sizes.append(0)
             # Make sure the xml folder is available.
             prepare_path(path, reset)
 
@@ -145,66 +141,72 @@ class WeatherDataFiles:
             if current_station_partition >= len(partition_paths):
                 current_station_partition = 0
 
-    
-    def copy_to_n_partitions_by_station(self, save_path, partitions, 
base_paths, reset):
-        """Once the initial data has been generated, the data can be copied 
into a set number of partitions. """
+    def build_to_n_partition_files(self, save_path, partitions, base_paths, 
reset):
+        """Once the initial data has been generated, the data can be divided 
into partitions 
+        and stored in single files.
+        """
         if (len(base_paths) == 0):
             return
         
+        XML_START = "<?xml version=\"1.0\" encoding=\"UTF-8\" 
standalone=\"yes\"?>"
+        
         # Initialize the partition paths.
-        partition_sizes = []
         partition_paths = get_partition_paths(0, partitions, base_paths)
+        sensors_partition_files = []
+        stations_partition_files = []
         for path in partition_paths:
-            partition_sizes.append(0)
             # Make sure the xml folder is available.
             prepare_path(path, reset)
+            prepare_path(path + "sensors/", False)
+            prepare_path(path + "stations/", False)
+            sensors_partition_files.append(open(path + 
"sensors/partition.xml", 'w'))
+            stations_partition_files.append(open(path + 
"stations/partition.xml", 'w'))
+    
+        for row in range(0, len(partition_paths)):
+            sensors_partition_files[row].write(XML_START + "<" + 
self.LARGE_FILE_ROOT_TAG + ">\n")
+            stations_partition_files[row].write(XML_START + "<" + 
self.LARGE_FILE_ROOT_TAG + ">\n")
 
-        # copy stations and sensors into each partition
-        current_partition = 0
-        csv_sorted = self.get_csv_in_partition_order()
-        for item, size in csv_sorted.iteritems():
-            if size < 0:
-                print "The progress file does not have the sensor size data 
saved."
-                return
-            
-            station_id = item.split('.')[0]
-            # Update partition bases on smallest current size.
-            current_partition = partition_sizes.index(min(partition_sizes))
-            
-            # Copy sensor files
-            type = "sensors"
-            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id
-            new_file_path = 
build_base_save_folder(partition_paths[current_partition], station_id, type) + 
station_id
-            if os.path.isdir(file_path):
-                distutils.dir_util.copy_tree(file_path, new_file_path)
-            partition_sizes[current_partition] += size
+        import fnmatch
+        import os
         
-            # Copy station files
-            type = "stations"
-            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id + ".xml"
-            new_file_base = 
build_base_save_folder(partition_paths[current_partition], station_id, type)
-            new_file_path = new_file_base + station_id + ".xml"
-            if os.path.isfile(file_path):
-                if not os.path.isdir(new_file_base):
-                    os.makedirs(new_file_base)
-                shutil.copyfile(file_path, new_file_path)
-    
-    def get_csv_in_partition_order(self):
+        # copy stations and sensors into each partition
+        current_sensor_partition = 0
+        current_station_partition = 0
         self.open_progress_data()
         row_count = len(self.progress_data)
-        
-        # Get the dictionary of all the files and data sizes.
-        csv_dict = dict()
         for row in range(0, row_count):
             row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
             file_name = row_contents[self.INDEX_DATA_FILE_NAME]
-            folder_data = int(row_contents[self.INDEX_DATA_FOLDER_DATA])
+            station_id = os.path.basename(file_name).split('.')[0]
+               
+            # Copy sensor files
+            type = "sensors"
+            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id
+            for root, dirnames, filenames in os.walk(file_path):
+                for filename in fnmatch.filter(filenames, '*.xml'):
+                    xml_path = os.path.join(root, filename)
+                    xml_data = file_get_contents(xml_path).replace(XML_START, 
"") + "\n"
+                    
sensors_partition_files[current_sensor_partition].write(xml_data)
+                    current_sensor_partition += 1
+                    if current_sensor_partition >= 
len(sensors_partition_files):
+                        current_sensor_partition = 0
             
-            csv_dict[file_name] = folder_data
-        
-        # New sorted list.
-        return OrderedDict(sorted(csv_dict.items(), key=lambda x: x[1], 
reverse=True))
-        
+            # Copy station files
+            type = "stations"
+            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id + ".xml"
+            xml_path = os.path.join(root, file_path)
+            xml_data = file_get_contents(xml_path).replace(XML_START, "") + 
"\n"
+            stations_partition_files[current_station_partition].write(xml_data)
+            current_station_partition += 1
+            if current_station_partition >= len(partition_paths):
+                current_station_partition = 0
+                
+        for row in range(0, len(partition_paths)):
+            sensors_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG 
+ ">\n")
+            sensors_partition_files[row].close()
+            stations_partition_files[row].write("</" + 
self.LARGE_FILE_ROOT_TAG + ">\n")
+            stations_partition_files[row].close()
+
     def get_file_row(self, file_name):
         for i in range(0, len(self.progress_data)):
             if self.progress_data[i].startswith(file_name):
@@ -405,3 +407,6 @@ def prepare_path(path, reset):
     if not os.path.isdir(path):
         os.makedirs(path)
 
+def file_get_contents(filename):
+    with open(filename) as f:
+        return f.read()

Reply via email to