[ 
https://issues.apache.org/jira/browse/SDAP-95?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16619442#comment-16619442
 ] 

ASF GitHub Bot commented on SDAP-95:
------------------------------------

fgreg closed pull request #33: SDAP-95 Clean up and document TimeAvgMapSpark
URL: https://github.com/apache/incubator-sdap-nexus/pull/33
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py 
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 19de786..473f4ce 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -15,131 +15,152 @@
 
 
 import logging
+from datetime import datetime
 
 import numpy as np
+import shapely.geometry
 from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
 
-# from time import time
-from webservice.NexusHandler import nexus_handler, SparkHandler, 
DEFAULT_PARAMETERS_SPEC
+from webservice.NexusHandler import nexus_handler, SparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, 
NoDataException
 
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
 
 @nexus_handler
 class TimeAvgMapSparkHandlerImpl(SparkHandler):
     name = "Time Average Map Spark"
     path = "/timeAvgMapSpark"
     description = "Computes a Latitude/Longitude Time Average plot given an 
arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "String",
+            "description": "The dataset used to generate the map. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. 
Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark 
Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this 
function. Optional (Default: local,1,1)"
+        }
+    }
     singleton = True
 
     def __init__(self):
         SparkHandler.__init__(self)
         self.log = logging.getLogger(__name__)
-        # self.log.setLevel(logging.DEBUG)
 
-    @staticmethod
-    def _map(tile_in_spark):
-        tile_bounds = tile_in_spark[0]
-        (min_lat, max_lat, min_lon, max_lon,
-         min_y, max_y, min_x, max_x) = tile_bounds
-        startTime = tile_in_spark[1]
-        endTime = tile_in_spark[2]
-        ds = tile_in_spark[3]
-        tile_service = NexusTileService()
-        # print 'Started tile {0}'.format(tile_bounds)
-        # sys.stdout.flush()
-        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
-        # days_at_a_time = 90
-        days_at_a_time = 30
-        # days_at_a_time = 7
-        # days_at_a_time = 1
-        # print 'days_at_a_time = {0}'.format(days_at_a_time)
-        t_incr = 86400 * days_at_a_time
-        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
-        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
-        t_start = startTime
-        while t_start <= endTime:
-            t_end = min(t_start + t_incr, endTime)
-            # t1 = time()
-            # print 'nexus call start at time {0}'.format(t1)
-            # sys.stdout.flush()
-            # nexus_tiles = \
-            #    TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
-            #                                              min_lat, max_lat, 
-            #                                              min_lon, max_lon, 
-            #                                              ds, 
-            #                                              t_start, 
-            #                                              t_end,
-            #                                              part_dim=2)
-            nexus_tiles = \
-                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
-                                                      min_lon, max_lon,
-                                                      ds=ds,
-                                                      start_time=t_start,
-                                                      end_time=t_end)
-            # t2 = time()
-            # print 'nexus call end at time %f' % t2
-            # print 'secs in nexus call: ', t2 - t1
-            # print 't %d to %d - Got %d tiles' % (t_start, t_end,
-            #                                     len(nexus_tiles))
-            # for nt in nexus_tiles:
-            #    print nt.granule
-            #    print nt.section_spec
-            #    print 'lat min/max:', np.ma.min(nt.latitudes), 
np.ma.max(nt.latitudes)
-            #    print 'lon min/max:', np.ma.min(nt.longitudes), 
np.ma.max(nt.longitudes)
-            # sys.stdout.flush()
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
 
-            for tile in nexus_tiles:
-                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
-                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
-                cnt_tile += (~tile.data.mask[0,
-                              min_y:max_y + 1,
-                              min_x:max_x + 1]).astype(np.uint8)
-            t_start = t_end + 1
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
+                                           request.get_max_lon(), 
request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited 
float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
+               spark_master, spark_nexecs, spark_nparts
 
-        # print 'cnt_tile = ', cnt_tile
-        # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
-        # sum_tile.mask = cnt_tile.mask
-        # avg_tile = sum_tile / cnt_tile
-        # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': 
cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in 
range(tile_inbounds_shape[0])]
-        # print 'Finished tile', tile_bounds
-        # print 'Tile avg = ', avg_tile
-        # sys.stdout.flush()
-        return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
-
-    def calc(self, computeOptions, **args):
+    def calc(self, compute_options, **args):
         """
 
-        :param computeOptions: StatsComputeOptions
+        :param compute_options: StatsComputeOptions
         :param args: dict
         :return:
         """
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
-        self._setQueryParams(computeOptions.get_dataset()[0],
-                             (float(computeOptions.get_min_lat()),
-                              float(computeOptions.get_max_lat()),
-                              float(computeOptions.get_min_lon()),
-                              float(computeOptions.get_max_lon())),
-                             computeOptions.get_start_time(),
-                             computeOptions.get_end_time(),
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
+
+        compute_options.get_spark_cfg()
+
+        self._setQueryParams(ds,
+                             (float(bbox.bounds[1]),
+                              float(bbox.bounds[3]),
+                              float(bbox.bounds[0]),
+                              float(bbox.bounds[2])),
+                             start_time,
+                             end_time,
                              spark_master=spark_master,
                              spark_nexecs=spark_nexecs,
                              spark_nparts=spark_nparts)
 
-        if 'CLIM' in self._ds:
-            raise NexusProcessingException(
-                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
-
         nexus_tiles = self._find_global_tile_set()
-        # print 'tiles:'
-        # for tile in nexus_tiles:
-        #     print tile.granule
-        #     print tile.section_spec
-        #     print 'lat:', tile.latitudes
-        #     print 'lon:', tile.longitudes
-
-        #                                                          nexus_tiles)
+
         if len(nexus_tiles) == 0:
             raise NoDataException(reason="No data found for selected 
timeframe")
 
@@ -152,14 +173,11 @@ def calc(self, computeOptions, **args):
         self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
                                                               
self._maxLonCent))
 
-        # for tile in nexus_tiles:
-        #    print 'lats: ', tile.latitudes.compressed()
-        #    print 'lons: ', tile.longitudes.compressed()
         # Create array of tuples to pass to Spark map function
         nexus_tiles_spark = [[self._find_tile_bounds(t),
                               self._startTime, self._endTime,
                               self._ds] for t in nexus_tiles]
-        # print 'nexus_tiles_spark = ', nexus_tiles_spark
+
         # Remove empty tiles (should have bounds set to None)
         bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
         for i in np.flipud(bad_tile_inds):
@@ -168,7 +186,7 @@ def calc(self, computeOptions, **args):
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
         num_time_parts = 72
-        # num_time_parts = 1
+
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
         self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
 
@@ -183,9 +201,6 @@ def calc(self, computeOptions, **args):
                       len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
         
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
-        # print 'nexus_tiles_spark final = '
-        # for i in range(len(nexus_tiles_spark)):
-        #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
         rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
@@ -246,13 +261,48 @@ def calc(self, computeOptions, **args):
         self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
 
         # Create dict for JSON response
-        results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+        results = [[{'mean': a[y, x], 'cnt': int(n[y, x]),
                      'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
                     for x in range(a.shape[1])] for y in range(a.shape[0])]
 
-        return TimeAvgMapSparkResults(results=results, meta={}, 
computeOptions=computeOptions)
+        return NexusResults(results=results, meta={}, stats=None,
+                            computeOptions=None, minLat=bbox.bounds[1],
+                            maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
+                            maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
+                            endTime=end_time)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+        days_at_a_time = 30
+
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
 
+            for tile in nexus_tiles:
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 
1]).astype(np.uint8)
+            t_start = t_end + 1
 
-class TimeAvgMapSparkResults(NexusResults):
-    def __init__(self, results=None, meta=None, computeOptions=None):
-        NexusResults.__init__(self, results=results, meta=meta, stats=None, 
computeOptions=computeOptions)
+        return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TimeAvgMapSpark
> ---------------
>
>                 Key: SDAP-95
>                 URL: https://issues.apache.org/jira/browse/SDAP-95
>             Project: Apache Science Data Analytics Platform
>          Issue Type: Sub-task
>          Components: nexus
>            Reporter: Frank Greguska
>            Priority: Minor
>
> Determine necessary updates to and document API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to