Benchmark clean up and adding an option for creating partitions of large files 
instead of the original small files.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/17a08527
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/17a08527
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/17a08527

Branch: refs/heads/master
Commit: 17a08527c4b689c67118445f7e326cbd50f36a32
Parents: 22f8441
Author: Preston Carman <[email protected]>
Authored: Thu May 8 14:08:04 2014 -0700
Committer: Preston Carman <[email protected]>
Committed: Thu May 8 14:15:35 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                |  53 ++---
 .../noaa-ghcn-daily/scripts/weather_cli.py      |   7 +-
 .../noaa-ghcn-daily/scripts/weather_config.py   |  12 +-
 .../scripts/weather_convert_to_xml.py           | 200 +------------------
 .../scripts/weather_data_files.py               | 107 +++++-----
 5 files changed, 102 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index daae6b2..b862b31 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -32,6 +32,8 @@ from weather_data_files import *
 #   logs/
 class WeatherBenchmark:
 
+    DATA_LINKS_FOLDER = "data_links/"
+    LARGE_FILE_ROOT_TAG = WeatherDataFiles.LARGE_FILE_ROOT_TAG
     QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/"
     QUERY_MASTER_FOLDER = "../queries/"
     QUERY_FILE_LIST = [
@@ -125,7 +127,7 @@ class WeatherBenchmark:
         scheme = []
         virtual_partitions = get_local_virtual_partitions(self.partitions)
         data_schemes = get_partition_scheme(0, virtual_partitions, 
self.base_paths)
-        link_base_schemes = get_partition_scheme(0, partition, 
self.base_paths, "data_links/" + test)
+        link_base_schemes = get_partition_scheme(0, partition, 
self.base_paths, self.DATA_LINKS_FOLDER + test)
 
         # Match link paths to real data paths.
         group_size = len(data_schemes) / len(link_base_schemes)
@@ -154,7 +156,7 @@ class WeatherBenchmark:
         local_virtual_partitions = 
get_local_virtual_partitions(self.partitions)
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, 
self.partitions)
         data_schemes = get_partition_scheme(node_index, virtual_partitions, 
self.base_paths)
-        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 
partition, self.base_paths, "data_links/" + test)
+        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 
partition, self.base_paths, self.DATA_LINKS_FOLDER + test)
 
         # Match link paths to real data paths.
         for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
@@ -181,59 +183,57 @@ class WeatherBenchmark:
             scheme.append([link_disk, -1, link_index, "", link_path])
         return scheme
     
-    def build_data_links(self):
+    def build_data_links(self, reset):
         if (len(self.base_paths) == 0):
             return
+        if reset:
+            shutil.rmtree(self.base_paths[0] + self.DATA_LINKS_FOLDER)
         for test in self.dataset.get_tests():
             if test in self.BENCHMARK_LOCAL_TESTS:
                 for i in self.partitions:
                     scheme = self.get_local_partition_scheme(test, i)
-                    self.build_data_links_scheme(scheme, False)
+                    self.build_data_links_scheme(scheme)
                 if 1 in self.partitions and len(self.base_paths) > 1:
                     scheme = self.build_data_links_local_zero_partition(test)
-                    self.build_data_links_scheme(scheme, False)
+                    self.build_data_links_scheme(scheme)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
                 for i in self.partitions:
                     scheme = self.get_cluster_partition_scheme(test, i)
                     self.build_data_links_scheme(scheme)
                 if 1 in self.partitions and len(self.base_paths) > 1:
                     scheme = self.build_data_links_cluster_zero_partition(test)
-                    self.build_data_links_scheme(scheme, False)
+                    self.build_data_links_scheme(scheme)
             else:
                 print "Unknown test."
                 exit()
     
-    def build_data_links_scheme(self, scheme, reset = True):
-        """Build all the data links based on the scheme information."""
-        link_path_cleared = []
+    def build_data_links_scheme(self, scheme):
+        '''Build all the data links based on the scheme information.'''
         for (data_disk, data_index, partition, data_path, link_path) in scheme:
-            if link_path not in link_path_cleared and os.path.isdir(link_path) 
and reset:
-                shutil.rmtree(link_path)
-                link_path_cleared.append(link_path)
             self.add_collection_links_for(data_path, link_path, data_index)
     
     def build_data_links_cluster_zero_partition(self, test):
-        """Build a scheme for all data in one symbolically linked folder. (0 
partition)"""
+        '''Build a scheme for all data in one symbolically linked folder. (0 
partition)'''
         scheme = []
-        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, 
self.base_paths, "data_links/" + test)
+        link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, 
self.base_paths, self.DATA_LINKS_FOLDER + test)
         for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
-            new_link_path = self.get_zero_partition_path(link_node, 
"data_links/" + test + "/" + str(link_node) + "nodes")
+            new_link_path = self.get_zero_partition_path(link_node, 
self.DATA_LINKS_FOLDER + test + "/" + str(link_node) + "nodes")
             scheme.append([0, link_disk, 0, link_path, new_link_path])
         return scheme
 
     def build_data_links_local_zero_partition(self, test):
-        """Build a scheme for all data in one symbolically linked folder. (0 
partition)"""
+        '''Build a scheme for all data in one symbolically linked folder. (0 
partition)'''
         scheme = []
         index = 0
-        link_base_schemes = get_partition_scheme(0, 1, self.base_paths, 
"data_links/" + test)
+        link_base_schemes = get_partition_scheme(0, 1, self.base_paths, 
self.DATA_LINKS_FOLDER + test)
         for link_node, link_disk, link_virtual, link_index, link_path in 
link_base_schemes:
-            new_link_path = self.get_zero_partition_path(link_node, 
"data_links/" + test)
+            new_link_path = self.get_zero_partition_path(link_node, 
self.DATA_LINKS_FOLDER + test)
             scheme.append([0, index, 0, link_path, new_link_path])
             index += 1
         return scheme
 
     def get_zero_partition_path(self, node, key):
-        """Return a partition path for the zero partition."""
+        '''Return a partition path for the zero partition.'''
         base_path = self.base_paths[0]
         new_link_path = get_partition_scheme(node, 1, [base_path], 
key)[0][PARTITION_INDEX_PATH]
         return new_link_path.replace("p1", "p0")
@@ -282,7 +282,7 @@ class WeatherBenchmark:
                 prepare_path(query_path, reset)
             
                 # Copy query files.
-                new_link_path = self.get_zero_partition_path(n, "data_links/" 
+ test + "/" + str(n) + "nodes")
+                new_link_path = self.get_zero_partition_path(n, 
self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes")
                 self.copy_and_replace_query(query_path, [new_link_path])
         for n in range(len(self.nodes)):
             for p in self.partitions:
@@ -290,7 +290,7 @@ class WeatherBenchmark:
                 prepare_path(query_path, reset)
             
                 # Copy query files.
-                partition_paths = get_partition_paths(n, p, self.base_paths, 
"data_links/" + test + "/" + str(n) + "nodes")
+                partition_paths = get_partition_paths(n, p, self.base_paths, 
self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes")
                 self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_local_query_files(self, test, reset):
@@ -301,14 +301,14 @@ class WeatherBenchmark:
             prepare_path(query_path, reset)
     
             # Copy query files.
-            new_link_path = self.get_zero_partition_path(0, "data_links/" + 
test)
+            new_link_path = self.get_zero_partition_path(0, 
self.DATA_LINKS_FOLDER + test)
             self.copy_and_replace_query(query_path, [new_link_path])
         for p in self.partitions:
             query_path = get_local_query_path(self.base_paths, test, p)
             prepare_path(query_path, reset)
     
             # Copy query files.
-            partition_paths = get_partition_paths(0, p, self.base_paths, 
"data_links/" + test)
+            partition_paths = get_partition_paths(0, p, self.base_paths, 
self.DATA_LINKS_FOLDER + test)
             self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_and_replace_query(self, query_path, replacement_list):
@@ -327,6 +327,13 @@ class WeatherBenchmark:
                 for line in fileinput.input(query_path + query_file, True):
                     sys.stdout.write(line.replace(self.QUERY_REPLACEMENT_KEY + 
collection, replace_string))
                     
+            # Make a search replace for partition type.
+            if self.dataset.get_partition_type() == "large_files":
+                for line in fileinput.input(query_path + query_file, True):
+                    sys.stdout.write(line.replace("/stationCollection", "/" + 
self.LARGE_FILE_ROOT_TAG + "/stationCollection"))
+                for line in fileinput.input(query_path + query_file, True):
+                    sys.stdout.write(line.replace("/dataCollection", "/" + 
self.LARGE_FILE_ROOT_TAG + "/dataCollection"))
+                    
     def get_number_of_slices(self):
         if len(self.dataset.get_tests()) == 0:
             print "No test has been defined in config file."

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index c18ac43..8ac6d17 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -209,13 +209,16 @@ def main(argv):
             if section == "partition_scheme":
                 benchmark.print_partition_scheme()
             else:
-                data.copy_to_n_partitions(xml_data_save_path, slices, 
base_paths, reset)
+                if dataset.get_partition_type() == "large_files":
+                    data.build_to_n_partition_files(xml_data_save_path, 
slices, base_paths, reset)
+                else:
+                    data.copy_to_n_partitions(xml_data_save_path, slices, 
base_paths, reset)
     
         if section in ("all", "test_links"):
             # TODO determine current node 
             print 'Processing the test links section (' + dataset.get_name() + 
').'
             benchmark.print_partition_scheme()
-            benchmark.build_data_links()
+            benchmark.build_data_links(reset)
 
         if section in ("all", "queries"):
             print 'Processing the queries section (' + dataset.get_name() + 
').'

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
index a6513c2..80607b8 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
@@ -41,9 +41,10 @@ class WeatherConfig:
         for node in self.config.getElementsByTagName("dataset"):
             name = self.get_dataset_name(node)
             save_paths = self.get_dataset_save_paths(node)
+            partition_type = self.get_dataset_partition_type(node)
             partitions = self.get_dataset_partitions(node)
             tests = self.get_dataset_tests(node)
-            nodes.append(Dataset(name, save_paths, partitions, tests))
+            nodes.append(Dataset(name, save_paths, partition_type, partitions, 
tests))
         return nodes
 
 
@@ -69,6 +70,9 @@ class WeatherConfig:
             paths.append(self.get_text(item))
         return paths
 
+    def get_dataset_partition_type(self, node):
+        return self.get_text(node.getElementsByTagName("partition_type")[0])
+
     def get_dataset_partitions(self, node):
         paths = []
         for item in node.getElementsByTagName("partitions_per_path"):
@@ -103,10 +107,11 @@ class Machine:
         return self.id + "(" + self.ip + ")"
     
 class Dataset:
-    def __init__(self, name, save_paths, partitions, tests):
+    def __init__(self, name, save_paths, partition_type, partitions, tests):
         self.name = name
         self.save_paths = save_paths
         self.partitions = partitions
+        self.partition_type = partition_type
         self.tests = tests
     
     def get_name(self):
@@ -118,6 +123,9 @@ class Dataset:
     def get_partitions(self):
         return self.partitions
     
+    def get_partition_type(self):
+        return self.partition_type
+    
     def get_tests(self):
         return self.tests
     

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
index a4f33a1..5db090a 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
@@ -150,10 +150,6 @@ class WeatherConvertToXML:
         # Default
         return 0
     
-    def process_one_day(self, records, report_date, page):
-        # Default
-        return 0
-    
     def process_station_data(self, row):
         # Default
         return 0
@@ -165,7 +161,7 @@ class WeatherConvertToXML:
         print "Processing inventory file"
         file_stream = open(self.ghcnd_inventory, 'r')
         
-        csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT',  'MAX_YEARS', 
'TOTAL_YEARS_FOR_ALL_SENSORS']
+        csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 
'TOTAL_YEARS_FOR_ALL_SENSORS']
         row = file_stream.readline()
         csv_inventory = {}
         for row in file_stream:
@@ -243,16 +239,6 @@ class WeatherConvertToXML:
     def convert_c2f(self, c):
         return (9 / 5 * c) + 32
     
-    def default_xml_start(self):
-        return textwrap.dedent("""\
-            <?xml version="1.0" encoding="ISO-8859-1"?>
-            <ghcnd_observation version="1.0"
-                 xmlns:xsd="http://www.w3.org/2001/XMLSchema";
-                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
-                <credit>NOAA's National Climatic Data Center (NCDC)</credit>
-                <credit_URL>http://www.ncdc.noaa.gov/</credit_URL>
-            """)
-    
     def default_xml_web_service_start(self):
         field_xml = ""
         field_xml += "<?xml version=\"1.0\" encoding=\"UTF-8\" 
standalone=\"yes\"?>\n"
@@ -273,76 +259,6 @@ class WeatherConvertToXML:
         field_xml += self.get_indent_space(indent) + "<date>" + 
str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + 
str(report_date.day).zfill(2) + "T00:00:00.000</date>\n"
         return field_xml
     
-    def get_date_from_field(self, row, field):
-        report_date = self.get_field_from_definition(row, field)
-        return str(report_date.year) + "-" + str(report_date.month).zfill(2) + 
"-" + str(report_date.day).zfill(2)
-    
-    def default_xml_field_date_iso8601(self, report_date):
-        field_xml = ""
-        field_xml += "    <observation_date>" + self.MONTHS[report_date.month 
- 1] + " " + str(report_date.day) + ", " + str(report_date.year) + 
"</observation_date>\n"
-        field_xml += "    <observation_date_iso8601>" + 
report_date.isoformat() + "</observation_date_iso8601>\n"
-        return field_xml
-    
-    def default_xml_field_date_year(self, year):
-        field_xml = ""
-        field_xml += "    <observation_year>" + str(year) + 
"</observation_year>\n"
-        return field_xml
-
-    def default_xml_field_date_month(self, month):
-        field_xml = ""
-        field_xml += "    <observation_month>" + str(month) + 
"</observation_month>\n"
-        return field_xml
-
-    def default_xml_field_date_day(self, day):
-        field_xml = ""
-        field_xml += "            <observation_day>" + str(day) + 
"</observation_day>\n"
-        return field_xml
-    
-    def default_xml_field_station_id(self, station_id, indent=2):
-        field_xml = ""
-        field_xml += self.get_indent_space(indent) + "<station_id>" + 
station_id + "</station_id>\n"
-        return field_xml
-    
-    def default_xml_field_station(self, station_id):
-        station_row = ""
-        stations_file = open(self.ghcnd_stations, 'r')
-        
-        for line in stations_file:
-            if station_id == self.get_field_from_definition(line, 
STATIONS_FIELDS['ID']):
-                station_row = line
-                break
-        
-        field_xml = ""
-        field_xml += "    <station_id>" + station_id + "</station_id>\n"
-        field_xml += "    <location>\n"
-        field_xml += "        <latitude>" + 
self.get_field_from_definition(station_row, 
STATIONS_FIELDS['LATITUDE']).strip() + "</latitude>\n"
-        field_xml += "        <longitude>" + 
self.get_field_from_definition(station_row, 
STATIONS_FIELDS['LONGITUDE']).strip() + "</longitude>\n"
-        
-        elevation = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['ELEVATION']).strip()
-        if elevation != "-999.9":
-            field_xml += "        <elevation>" + elevation + "</elevation>\n"
-        
-        field_xml += "    </location>\n"
-        field_xml += "    <name>" + 
self.get_field_from_definition(station_row, STATIONS_FIELDS['NAME']).strip() + 
"</name>\n"
-    
-        state = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['STATE'])
-        if state.strip():
-            field_xml += "    <state>" + state + "</state>\n"
-    
-        gsn = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['GSNFLAG'])
-        if gsn.strip():
-            field_xml += "    <gsn />\n"
-    
-        hcn = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['HCNFLAG'])
-        if hcn.strip():
-            field_xml += "    <hcn />\n"
-    
-        wmoid = self.get_field_from_definition(station_row, 
STATIONS_FIELDS['WMOID'])
-        if wmoid.strip():
-            field_xml += "    <wmoid id=\"" + wmoid + "\" />\n"
-    
-        return field_xml
-    
     def default_xml_mshr_station_additional(self, station_id):
         """The web service station data is generate from the MSHR data 
supplemented with GHCN-Daily."""
         station_mshr_row = ""
@@ -492,120 +408,6 @@ class WeatherConvertToXML:
     def get_indent_space(self, indent):
         return (" " * (4 * indent))
     
-class WeatherDailyXMLFile(WeatherConvertToXML):
-    def process_one_month_sensor_set(self, records, page):
-        year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR))
-        month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH))
-    
-        station_id = self.get_dly_field(records[0], DLY_FIELD_ID)
-        
-        count = 0
-        for day in range(1, 32):
-            try:
-                # TODO find out what is a valid python date range? 1889?
-                # Attempt to see if this is valid date.
-                report_date = date(year, month, day)
-                save_file_name = self.process_one_day(records, report_date, 
page)
-                if save_file_name is not "":
-                    count = count + 1
-                    if self.debug_output:
-                        print "Wrote file: " + save_file_name
-            except ValueError:
-                if self.debug_output:
-                    print "Error: Not a valid date (" + str(month) + "/" + 
str(day) + "/" + str(year) + ") for " + station_id + "."
-                pass
-        return count
-
-    def process_one_day(self, records, report_date, page):
-        station_id = self.get_dly_field(records[0], DLY_FIELD_ID)
-        found_data = False
-                
-        # Information for each daily file.
-        daily_xml_file = self.default_xml_start()
-        daily_xml_file += self.default_xml_field_station(station_id)
-        daily_xml_file += self.default_xml_field_date_iso8601(report_date)
-        daily_xml_file += self.default_xml_start_tag("sensors")
-        for record in records:
-            record_xml_snip = self.default_xml_day_reading_as_field(record, 
report_date.day)
-            if record_xml_snip is not "":
-                found_data = True
-            daily_xml_file += record_xml_snip
-        daily_xml_file += self.default_xml_end_tag("sensors")
-        daily_xml_file += self.default_xml_end()
-        
-        if not found_data:
-            return ""
-        
-        # Make sure the station folder is available.
-        ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + 
station_id + "/" + str(report_date.year) + "/"
-        if not os.path.isdir(ghcnd_xml_station_path):
-            os.makedirs(ghcnd_xml_station_path)
-        
-        # Save XML string to disk.
-        save_file_name = ghcnd_xml_station_path + 
build_sensor_save_filename(station_id, report_date, page)
-        save_file_name = self.save_file(save_file_name, daily_xml_file)
-                
-        return save_file_name
-    
-class WeatherMonthlyXMLFile(WeatherConvertToXML):
-    def process_one_month_sensor_set(self, records, page):
-        found_data = False        
-        year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR))
-        month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH))
-    
-        station_id = self.get_dly_field(records[0], DLY_FIELD_ID)
-
-        # Information for each daily file.
-        daily_xml_file = self.default_xml_start()
-        daily_xml_file += self.default_xml_field_station(station_id)
-        daily_xml_file += self.default_xml_field_date_year(year)
-        daily_xml_file += self.default_xml_field_date_month(month)
-        daily_xml_file += self.default_xml_start_tag("readings")
-        
-        for day in range(1, 32):
-            try:
-                # TODO find out what is a valid python date range? 1889?
-                # Attempt to see if this is valid date.
-                report_date = date(year, month, day)
-                found_daily_data = False
-                record_xml_snip = ""
-
-                for record in records:
-                    record_xml_snip += 
self.default_xml_day_reading_as_field(record, report_date.day)
-                    if record_xml_snip is not "":
-                        found_data = True
-                        found_daily_data = True
-
-                if found_daily_data:
-                    daily_xml_file += self.default_xml_start_tag("reading", 2)
-                    daily_xml_file += self.default_xml_field_date_day(day)
-                    daily_xml_file += record_xml_snip
-                    daily_xml_file += self.default_xml_end_tag("reading", 2)
-                
-            except ValueError:
-                pass
-
-        daily_xml_file += self.default_xml_end_tag("readings")
-        daily_xml_file += self.default_xml_end()
-
-        if not found_data:
-            return 0
-
-        # Make sure the station folder is available.
-        ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + 
station_id + "/" + str(report_date.year) + "/"
-        if not os.path.isdir(ghcnd_xml_station_path):
-            os.makedirs(ghcnd_xml_station_path)
-                
-        # Save XML string to disk.
-        save_file_name = ghcnd_xml_station_path + 
build_sensor_save_filename(station_id, report_date, page)
-        save_file_name = self.save_file(save_file_name, daily_xml_file)
-
-        if save_file_name is not "":
-            if self.debug_output:
-                print "Wrote file: " + save_file_name
-            return 1
-        else:
-            return 0
 
 class WeatherWebServiceMonthlyXMLFile(WeatherConvertToXML):
     """The web service class details how to create files similar to the NOAA 
web service."""

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/17a08527/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index a7fb691..1c9f129 100644
--- 
a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ 
b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -26,6 +26,8 @@ from collections import OrderedDict
 # Allows partition and picking up where you left off.
 class WeatherDataFiles:
 
+    LARGE_FILE_ROOT_TAG = "root"
+
     INDEX_DATA_FILE_NAME = 0
     INDEX_DATA_SENSORS_STATUS = 1
     INDEX_DATA_STATION_STATUS = 2
@@ -51,14 +53,10 @@ class WeatherDataFiles:
         self.current = self.DATA_FILE_START_INDEX
         self.progress_data = []
 
-        
-    def get_file_list(self):
-        return glob.glob(self.base_path + "/*" + self.DATA_FILE_EXTENSION)
-
     def get_file_list_iterator(self):
+        """Return the list of files one at a time."""
         return glob.iglob(self.base_path + "/*" + self.DATA_FILE_EXTENSION)
 
-
     # Save Functions
     def build_progress_file(self, options, convert):
         if not os.path.isfile(self.progress_file_name) or 'reset' in options:
@@ -98,10 +96,8 @@ class WeatherDataFiles:
             return
         
         # Initialize the partition paths.
-        partition_sizes = []
         partition_paths = get_partition_paths(0, partitions, base_paths)
         for path in partition_paths:
-            partition_sizes.append(0)
             # Make sure the xml folder is available.
             prepare_path(path, reset)
 
@@ -145,66 +141,72 @@ class WeatherDataFiles:
             if current_station_partition >= len(partition_paths):
                 current_station_partition = 0
 
-    
-    def copy_to_n_partitions_by_station(self, save_path, partitions, 
base_paths, reset):
-        """Once the initial data has been generated, the data can be copied 
into a set number of partitions. """
+    def build_to_n_partition_files(self, save_path, partitions, base_paths, 
reset):
+        """Once the initial data has been generated, the data can be divided 
into partitions 
+        and stored in single files.
+        """
         if (len(base_paths) == 0):
             return
         
+        XML_START = "<?xml version=\"1.0\" encoding=\"UTF-8\" 
standalone=\"yes\"?>"
+        
         # Initialize the partition paths.
-        partition_sizes = []
         partition_paths = get_partition_paths(0, partitions, base_paths)
+        sensors_partition_files = []
+        stations_partition_files = []
         for path in partition_paths:
-            partition_sizes.append(0)
             # Make sure the xml folder is available.
             prepare_path(path, reset)
+            prepare_path(path + "sensors/", False)
+            prepare_path(path + "stations/", False)
+            sensors_partition_files.append(open(path + 
"sensors/partition.xml", 'w'))
+            stations_partition_files.append(open(path + 
"stations/partition.xml", 'w'))
+    
+        for row in range(0, len(partition_paths)):
+            sensors_partition_files[row].write(XML_START + "<" + 
self.LARGE_FILE_ROOT_TAG + ">\n")
+            stations_partition_files[row].write(XML_START + "<" + 
self.LARGE_FILE_ROOT_TAG + ">\n")
 
-        # copy stations and sensors into each partition
-        current_partition = 0
-        csv_sorted = self.get_csv_in_partition_order()
-        for item, size in csv_sorted.iteritems():
-            if size < 0:
-                print "The progress file does not have the sensor size data 
saved."
-                return
-            
-            station_id = item.split('.')[0]
-            # Update partition bases on smallest current size.
-            current_partition = partition_sizes.index(min(partition_sizes))
-            
-            # Copy sensor files
-            type = "sensors"
-            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id
-            new_file_path = 
build_base_save_folder(partition_paths[current_partition], station_id, type) + 
station_id
-            if os.path.isdir(file_path):
-                distutils.dir_util.copy_tree(file_path, new_file_path)
-            partition_sizes[current_partition] += size
+        import fnmatch
+        import os
         
-            # Copy station files
-            type = "stations"
-            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id + ".xml"
-            new_file_base = 
build_base_save_folder(partition_paths[current_partition], station_id, type)
-            new_file_path = new_file_base + station_id + ".xml"
-            if os.path.isfile(file_path):
-                if not os.path.isdir(new_file_base):
-                    os.makedirs(new_file_base)
-                shutil.copyfile(file_path, new_file_path)
-    
-    def get_csv_in_partition_order(self):
+        # copy stations and sensors into each partition
+        current_sensor_partition = 0
+        current_station_partition = 0
         self.open_progress_data()
         row_count = len(self.progress_data)
-        
-        # Get the dictionary of all the files and data sizes.
-        csv_dict = dict()
         for row in range(0, row_count):
             row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
             file_name = row_contents[self.INDEX_DATA_FILE_NAME]
-            folder_data = int(row_contents[self.INDEX_DATA_FOLDER_DATA])
+            station_id = os.path.basename(file_name).split('.')[0]
+               
+            # Copy sensor files
+            type = "sensors"
+            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id
+            for root, dirnames, filenames in os.walk(file_path):
+                for filename in fnmatch.filter(filenames, '*.xml'):
+                    xml_path = os.path.join(root, filename)
+                    xml_data = file_get_contents(xml_path).replace(XML_START, 
"") + "\n"
+                    
sensors_partition_files[current_sensor_partition].write(xml_data)
+                    current_sensor_partition += 1
+                    if current_sensor_partition >= 
len(sensors_partition_files):
+                        current_sensor_partition = 0
             
-            csv_dict[file_name] = folder_data
-        
-        # New sorted list.
-        return OrderedDict(sorted(csv_dict.items(), key=lambda x: x[1], 
reverse=True))
-        
+            # Copy station files
+            type = "stations"
+            file_path = build_base_save_folder(save_path, station_id, type) + 
station_id + ".xml"
+            xml_path = os.path.join(root, file_path)
+            xml_data = file_get_contents(xml_path).replace(XML_START, "") + 
"\n"
+            stations_partition_files[current_station_partition].write(xml_data)
+            current_station_partition += 1
+            if current_station_partition >= len(partition_paths):
+                current_station_partition = 0
+                
+        for row in range(0, len(partition_paths)):
+            sensors_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG 
+ ">\n")
+            sensors_partition_files[row].close()
+            stations_partition_files[row].write("</" + 
self.LARGE_FILE_ROOT_TAG + ">\n")
+            stations_partition_files[row].close()
+
     def get_file_row(self, file_name):
         for i in range(0, len(self.progress_data)):
             if self.progress_data[i].startswith(file_name):
@@ -405,3 +407,6 @@ def prepare_path(path, reset):
     if not os.path.isdir(path):
         os.makedirs(path)
 
+def file_get_contents(filename):
+    with open(filename) as f:
+        return f.read()

Reply via email to