[ 
https://issues.apache.org/jira/browse/MAHOUT-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14034291#comment-14034291
 ] 

ASF GitHub Bot commented on MAHOUT-1573:
----------------------------------------

Github user dlyubimov commented on the pull request:

    https://github.com/apache/mahout/pull/13#issuecomment-46358010
  
    @Sebastian, you wanted this, can you take a look oif this serves the 
purpose? 
    
    @tdunning, spark allows finer splits (minSplits parameter) on hdfs loading 
and on intermediate products. In our case, this patch adds minSplits parameter 
to fromHDFS() method to allow to specify finer-than-iniitial HDFS load 
parallelism explicitly. 
    
    In addition to that, parallelism can also be re-adjusted explicitly or 
automatically using the operators i mentioned. 
    
    Automatic parallellism adjustment uses current parallelism and spark 
cluster default parallelisms as guidelines (i.e. it assumes tasks are already 
of somewhat reasonable size, per above).
    
    
    i.g. loading matrix from hdfs 
    
        val a = drmFromHDFS(path, minsplits=100) 
    
    to make sure number of partitions (i.e. map tasks) is at least 100 on 
matrix load.
    
    This mainly has to do with the fact that algebraic flops often grow 
asymptotically faster than the input size, so in some cases default cluster 
size is the best guideline in terms of load balancing.
    
    (in fact, since in Spark tasks are super cheap to run, i think it is ok to 
split 400%, 500% of default parallelism to achieve more even load). 
    
    Optimizer computes ("predicts") parallelism of shuffles based on existing 
parallelism of products, but after a long chain of operations this predictions 
may deteriorate (or just not work well for whatever reason). in this case we 
may readjust it explicitly at any checkpoint of expression: 
    
         val drmAtA = (drmA.t %*% drmA) min_|| 100
    
    will make sure that this product will be shuffled into at least 100 
partitions. Analogously, 
    
        val drmAtA = (drmA.t %*% drmA) exact_|| 100
    
    will make sure that this product will be shuffled into exactly 100 
partitions. 
    
    Readjusting parallelism also requires implicit optimizer checkpoint, e.g. 
the latter would be equivalent to 
    
        val drmAtA = (drmA.t %*% drmA).checkpoint() exact_|| 100
    
    
    Repartitioning does not necessarily invoke shuffle task (e.g. if a 
partition can be just split into two, it can be run as a map-only thing). 
    
    This patch also does not contain optimizations causing potentially changing 
parallelism of any previous operations in the physical pipeline. This is 
something left todo, something to think about.
    



> More explicit parallelism adjustments in math-scala DRM apis; elements of 
> automatic re-adjustments
> --------------------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1573
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1573
>             Project: Mahout
>          Issue Type: Task
>    Affects Versions: 0.9
>            Reporter: Dmitriy Lyubimov
>            Assignee: Dmitriy Lyubimov
>             Fix For: 1.0
>
>
> (1) add minSplit parameter pass-thru to drmFromHDFS to be able to explicitly 
> increase parallelism. 
> (2) add parrallelism readjustment parameter to a checkpoint() call. This 
> implies shuffle-less coalesce() translation to the data set before it is 
> requested to be cached (if specified).
> Going forward, we probably should try and figure how we can automate it,  at 
> least a little bit. For example, the simplest automatic adjustment might 
> include re-adjust parallelims on load to simply fit cluster size (95% or 180% 
> of cluster size, for example), with some rule-of-thumb safeguards here, e.g. 
> we cannot exceed a factor of say 8 (or whatever we configure) in splitting 
> each original hdfs split. We should be able to get a reasonable parallelism 
> performance out of the box on simple heuristics like that.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to