jjacob7734 closed pull request #50: SDAP-151 Determine parallelism 
automatically for Spark analytics
URL: https://github.com/apache/incubator-sdap-nexus/pull/50
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/webservice/NexusHandler.py 
b/analysis/webservice/NexusHandler.py
index eb6373f..fd3cb4b 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -320,8 +320,7 @@ def set_config(self, algorithm_config):
 
     def _setQueryParams(self, ds, bounds, start_time=None, end_time=None,
                         start_year=None, end_year=None, clim_month=None,
-                        fill=-9999., spark_master=None, spark_nexecs=None,
-                        spark_nparts=None):
+                        fill=-9999.):
         self._ds = ds
         self._minLat, self._maxLat, self._minLon, self._maxLon = bounds
         self._startTime = start_time
@@ -330,10 +329,7 @@ def _setQueryParams(self, ds, bounds, start_time=None, 
end_time=None,
         self._endYear = end_year
         self._climMonth = clim_month
         self._fill = fill
-        self._spark_master = spark_master
-        self._spark_nexecs = spark_nexecs
-        self._spark_nparts = spark_nparts
-
+        
     def _set_info_from_tile_set(self, nexus_tiles):
         ntiles = len(nexus_tiles)
         self.log.debug('Attempting to extract info from {0} tiles'.\
@@ -578,6 +574,13 @@ def _create_nc_file_latlon2d(self, a, fname, varname, 
varunits=None,
     def _create_nc_file(self, a, fname, varname, **kwargs):
         self._create_nc_file_latlon2d(a, fname, varname, **kwargs)
 
+    def _spark_nparts(self, nparts_requested):
+        max_parallelism = 128
+        num_partitions = min(nparts_requested if nparts_requested > 0
+                             else self._sc.defaultParallelism,
+                             max_parallelism)
+        return num_partitions
+
 
 def executeInitializers(config):
     [wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py 
b/analysis/webservice/algorithms_spark/ClimMapSpark.py
index eb567f5..75c7b73 100644
--- a/analysis/webservice/algorithms_spark/ClimMapSpark.py
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import math
 import logging
 from calendar import timegm, monthrange
 from datetime import datetime
@@ -120,7 +121,6 @@ def calc(self, computeOptions, **args):
         :return:
         """
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
         self._setQueryParams(computeOptions.get_dataset()[0],
                              (float(computeOptions.get_min_lat()),
                               float(computeOptions.get_max_lat()),
@@ -128,10 +128,7 @@ def calc(self, computeOptions, **args):
                               float(computeOptions.get_max_lon())),
                              start_year=computeOptions.get_start_year(),
                              end_year=computeOptions.get_end_year(),
-                             clim_month=computeOptions.get_clim_month(),
-                             spark_master=spark_master,
-                             spark_nexecs=spark_nexecs,
-                             spark_nparts=spark_nparts)
+                             clim_month=computeOptions.get_clim_month())
         self._startTime = timegm((self._startYear, 1, 1, 0, 0, 0))
         self._endTime = timegm((self._endYear, 12, 31, 23, 59, 59))
 
@@ -139,6 +136,8 @@ def calc(self, computeOptions, **args):
             raise NexusProcessingException(reason="Cannot compute 
Latitude/Longitude Time Average map on a climatology",
                                            code=400)
 
+        nparts_requested = computeOptions.get_nparts()
+
         nexus_tiles = self._find_global_tile_set()
         # print 'tiles:'
         # for tile in nexus_tiles:
@@ -199,7 +198,9 @@ def calc(self, computeOptions, **args):
         #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
-        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         sum_count_part = rdd.map(self._map)
         sum_count = \
             sum_count_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py 
b/analysis/webservice/algorithms_spark/CorrMapSpark.py
index c6b0c99..9503298 100644
--- a/analysis/webservice/algorithms_spark/CorrMapSpark.py
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -15,8 +15,9 @@
 
 
 import json
+import math
 import logging
-
+from datetime import datetime
 import numpy as np
 from nexustiles.nexustiles import NexusTileService
 
@@ -164,17 +165,14 @@ def _map(tile_in):
 
     def calc(self, computeOptions, **args):
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
         self._setQueryParams(computeOptions.get_dataset(),
                              (float(computeOptions.get_min_lat()),
                               float(computeOptions.get_max_lat()),
                               float(computeOptions.get_min_lon()),
                               float(computeOptions.get_max_lon())),
                              computeOptions.get_start_time(),
-                             computeOptions.get_end_time(),
-                             spark_master=spark_master,
-                             spark_nexecs=spark_nexecs,
-                             spark_nparts=spark_nparts)
+                             computeOptions.get_end_time())
+        nparts_requested = computeOptions.get_nparts()
 
         self.log.debug('ds = {0}'.format(self._ds))
         if not len(self._ds) == 2:
@@ -200,6 +198,20 @@ def calc(self, computeOptions, **args):
         self.log.debug('Using Native resolution: lat_res={0}, 
lon_res={1}'.format(self._latRes, self._lonRes))
         self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
 
+        daysinrange = self._tile_service.find_days_in_range_asc(self._minLat,
+                                                                self._maxLat,
+                                                                self._minLon,
+                                                                self._maxLon,
+                                                                self._ds[0],
+                                                                
self._startTime,
+                                                                self._endTime)
+        ndays = len(daysinrange)
+        if ndays == 0:
+            raise NoDataException(reason="No data found for selected 
timeframe")
+        self.log.debug('Found {0} days in range'.format(ndays))
+        for i, d in enumerate(daysinrange):
+            self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
         # Create array of tuples to pass to Spark map function
         nexus_tiles_spark = [[self._find_tile_bounds(t),
                               self._startTime, self._endTime,
@@ -212,30 +224,20 @@ def calc(self, computeOptions, **args):
 
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
-        num_time_parts = 72
-        # num_time_parts = 2
-        # num_time_parts = 1
+        # Set the time boundaries for each of the Spark map tuples so that
+        # every Nth element in the array gets the same time bounds.
+        max_time_parts = 72
+        num_time_parts = min(max_time_parts, ndays)
+
+        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in 
np.array_split(np.array(daysinrange), num_time_parts)]), 
(len(nexus_tiles_spark),1))
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
-        self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
-
-        # Set the time boundaries for each of the Spark map tuples.
-        # Every Nth element in the array gets the same time bounds.
-        spark_part_times = np.linspace(self._startTime, self._endTime + 1,
-                                       num_time_parts + 1, dtype=np.int64)
-
-        spark_part_time_ranges = \
-            np.repeat([[[spark_part_times[i],
-                         spark_part_times[i + 1] - 1] for i in 
range(num_time_parts)]],
-                      len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
-        
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
-        # print 'nexus_tiles_spark final = '
-        # for i in range(len(nexus_tiles_spark)):
-        #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
-        # print 'nexus_tiles_spark=',nexus_tiles_spark
-        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         sum_tiles_part = rdd.map(self._map)
         # print "sum_tiles_part = ",sum_tiles_part.collect()
         sum_tiles = \
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py 
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 473f4ce..9b00489 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import math
 import logging
 from datetime import datetime
 
@@ -128,13 +129,12 @@ def parse_arguments(self, request):
                     request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
                 code=400)
 
-        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+        nparts_requested = request.get_nparts()
 
         start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
 
-        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
-               spark_master, spark_nexecs, spark_nparts
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, nparts_requested
 
     def calc(self, compute_options, **args):
         """
@@ -144,20 +144,14 @@ def calc(self, compute_options, **args):
         :return:
         """
 
-        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
-
-        compute_options.get_spark_cfg()
-
+        ds, bbox, start_time, end_time, nparts_requested = 
self.parse_arguments(compute_options)
         self._setQueryParams(ds,
                              (float(bbox.bounds[1]),
                               float(bbox.bounds[3]),
                               float(bbox.bounds[0]),
                               float(bbox.bounds[2])),
                              start_time,
-                             end_time,
-                             spark_master=spark_master,
-                             spark_nexecs=spark_nexecs,
-                             spark_nparts=spark_nparts)
+                             end_time)
 
         nexus_tiles = self._find_global_tile_set()
 
@@ -165,6 +159,22 @@ def calc(self, compute_options, **args):
             raise NoDataException(reason="No data found for selected 
timeframe")
 
         self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+        print('Found {} tiles'.format(len(nexus_tiles)))
+
+        daysinrange = self._tile_service.find_days_in_range_asc(bbox.bounds[1],
+                                                                bbox.bounds[3],
+                                                                bbox.bounds[0],
+                                                                bbox.bounds[2],
+                                                                ds,
+                                                                start_time,
+                                                                end_time)
+        ndays = len(daysinrange)
+        if ndays == 0:
+            raise NoDataException(reason="No data found for selected 
timeframe")
+        self.log.debug('Found {0} days in range'.format(ndays))
+        for i, d in enumerate(daysinrange):
+            self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
 
         self.log.debug('Using Native resolution: lat_res={0}, 
lon_res={1}'.format(self._latRes, self._lonRes))
         self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
@@ -185,25 +195,20 @@ def calc(self, compute_options, **args):
 
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
-        num_time_parts = 72
+        # Set the time boundaries for each of the Spark map tuples so that
+        # every Nth element in the array gets the same time bounds.
+        max_time_parts = 72
+        num_time_parts = min(max_time_parts, ndays)
 
+        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in 
np.array_split(np.array(daysinrange), num_time_parts)]), 
(len(nexus_tiles_spark),1))
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
-        self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
-
-        # Set the time boundaries for each of the Spark map tuples.
-        # Every Nth element in the array gets the same time bounds.
-        spark_part_times = np.linspace(self._startTime, self._endTime,
-                                       num_time_parts + 1, dtype=np.int64)
-
-        spark_part_time_ranges = \
-            np.repeat([[[spark_part_times[i],
-                         spark_part_times[i + 1]] for i in 
range(num_time_parts)]],
-                      len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
-        
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
 
         # Launch Spark computations
-        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         sum_count_part = rdd.map(self._map)
         sum_count = \
             sum_count_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index a24c2d5..4a102aa 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import math
 import calendar
 import itertools
 import logging
@@ -153,13 +154,12 @@ def parse_arguments(self, request):
         apply_seasonal_cycle_filter = 
request.get_apply_seasonal_cycle_filter(default=False)
         apply_low_pass_filter = request.get_apply_low_pass_filter()
 
-        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
-
         start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
 
-        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
-               apply_seasonal_cycle_filter, apply_low_pass_filter, 
spark_master, spark_nexecs, spark_nparts
+        nparts_requested = request.get_nparts()
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested
 
     def calc(self, request, **args):
         """
@@ -169,9 +169,7 @@ def calc(self, request, **args):
         :return:
         """
 
-        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
-        apply_seasonal_cycle_filter, apply_low_pass_filter, spark_master, \
-        spark_nexecs, spark_nparts = self.parse_arguments(request)
+        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested = self.parse_arguments(request)
 
         resultsRaw = []
 
@@ -194,11 +192,12 @@ def calc(self, request, **args):
             self.log.debug('Found {0} days in range'.format(ndays))
             for i, d in enumerate(daysinrange):
                 self.log.debug('{0}, {1}'.format(i, 
datetime.utcfromtimestamp(d)))
-            spark_nparts_needed = min(spark_nparts, ndays)
-
+            spark_nparts = self._spark_nparts(nparts_requested)
+            self.log.info('Using {} partitions'.format(spark_nparts))
             the_time = datetime.now()
-            results, meta = spark_driver(daysinrange, bounding_polygon, 
shortName,
-                                         
spark_nparts_needed=spark_nparts_needed, sc=self._sc)
+            results, meta = spark_driver(daysinrange, bounding_polygon,
+                                         shortName, spark_nparts=spark_nparts,
+                                         sc=self._sc)
             self.log.info(
                 "Time series calculation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
 
@@ -487,15 +486,15 @@ def createLinePlot(self):
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999., 
spark_nparts_needed=1, sc=None):
+def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
+                 spark_nparts=1, sc=None):
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
                           list(daysinrange_part), fill)
                          for daysinrange_part
-                         in np.array_split(daysinrange,
-                                           spark_nparts_needed)]
+                         in np.array_split(daysinrange, spark_nparts)]
 
     # Launch Spark computations
-    rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed)
+    rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
     results = rdd.map(calc_average_on_day).collect()
     results = list(itertools.chain.from_iterable(results))
     results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/webmodel.py b/analysis/webservice/webmodel.py
index feeb019..0f98c30 100644
--- a/analysis/webservice/webmodel.py
+++ b/analysis/webservice/webmodel.py
@@ -51,7 +51,7 @@ class RequestParameters(object):
     ORDER = "lpOrder"
     PLOT_SERIES = "plotSeries"
     PLOT_TYPE = "plotType"
-    SPARK_CFG = "spark"
+    NPARTS = "nparts"
     METADATA_FILTER = "metadataFilter"
 
 
@@ -79,12 +79,6 @@ def __init__(self, reason="Dataset not found"):
         NexusProcessingException.__init__(self, 
StandardNexusErrors.DATASET_MISSING, reason, code=404)
 
 
-class SparkConfig(object):
-    MAX_NUM_EXECS = 64
-    MAX_NUM_PARTS = 8192
-    DEFAULT = "local,1,1"
-
-
 class StatsComputeOptions(object):
     def __init__(self):
         pass
@@ -149,7 +143,7 @@ def get_plot_series(self, default="mean"):
     def get_plot_type(self, default="default"):
         raise Exception("Please implement")
 
-    def get_spark_cfg(self, default=SparkConfig.DEFAULT):
+    def get_nparts(self):
         raise Exception("Please implement")
 
 
@@ -343,25 +337,8 @@ def get_plot_series(self, default="mean"):
     def get_plot_type(self, default="default"):
         return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
 
-    def get_spark_cfg(self, default=SparkConfig.DEFAULT):
-        arg = self.get_argument(RequestParameters.SPARK_CFG, default)
-        try:
-            master, nexecs, nparts = arg.split(',')
-        except:
-            raise ValueError('Invalid spark configuration: %s' % arg)
-        if master not in ("local", "yarn", "mesos"):
-            raise ValueError('Invalid spark master: %s' % master)
-        nexecs = int(nexecs)
-        if (nexecs < 1) or (nexecs > SparkConfig.MAX_NUM_EXECS):
-            raise ValueError('Invalid number of Spark executors: %d (must be 
between 1 and %d)' % (
-            nexecs, SparkConfig.MAX_NUM_EXECS))
-        nparts = int(nparts)
-        if (nparts < 1) or (nparts > SparkConfig.MAX_NUM_PARTS):
-            raise ValueError('Invalid number of Spark data partitions: %d 
(must be between 1 and %d)' % (
-            nparts, SparkConfig.MAX_NUM_PARTS))
-        if master == "local":
-            master = "local[%d]" % nexecs
-        return master, nexecs, nparts
+    def get_nparts(self):
+        return self.get_int_arg(RequestParameters.NPARTS, 0)
 
 
 class NexusResults:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to