[ 
https://issues.apache.org/jira/browse/MAHOUT-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14034652#comment-14034652
 ] 

ASF GitHub Bot commented on MAHOUT-1573:
----------------------------------------

Github user tdunning commented on the pull request:

    https://github.com/apache/mahout/pull/13#issuecomment-46383131
  
    Why the fancy syntax?
    
    Why not a function or method notation?
    
    At the very least, why use punctuation in the name rather than spelling it
    out?
    
    
    
    
    On Tue, Jun 17, 2014 at 1:05 PM, Dmitriy Lyubimov <[email protected]>
    wrote:
    
    > @Sebastian <https://github.com/Sebastian>, you wanted this, can you take
    > a look oif this serves the purpose?
    >
    > @tdunning <https://github.com/tdunning>, spark allows finer splits
    > (minSplits parameter) on hdfs loading and on intermediate products. In our
    > case, this patch adds minSplits parameter to fromHDFS() method to allow to
    > specify finer-than-iniitial HDFS load parallelism explicitly.
    >
    > In addition to that, parallelism can also be re-adjusted explicitly or
    > automatically using the operators i mentioned.
    >
    > Automatic parallellism adjustment uses current parallelism and spark
    > cluster default parallelisms as guidelines (i.e. it assumes tasks are
    > already of somewhat reasonable size, per above).
    >
    > i.g. loading matrix from hdfs
    >
    > val a = drmFromHDFS(path, minsplits=100)
    >
    > to make sure number of partitions (i.e. map tasks) is at least 100 on
    > matrix load.
    >
    > This mainly has to do with the fact that algebraic flops often grow
    > asymptotically faster than the input size, so in some cases default 
cluster
    > size is the best guideline in terms of load balancing.
    >
    > (in fact, since in Spark tasks are super cheap to run, i think it is ok to
    > split 400%, 500% of default parallelism to achieve more even load).
    >
    > Optimizer computes ("predicts") parallelism of shuffles based on existing
    > parallelism of products, but after a long chain of operations this
    > predictions may deteriorate (or just not work well for whatever reason). 
in
    > this case we may readjust it explicitly at any checkpoint of expression:
    >
    >  val drmAtA = (drmA.t %*% drmA) min_|| 100
    >
    > will make sure that this product will be shuffled into at least 100
    > partitions. Analogously,
    >
    > val drmAtA = (drmA.t %*% drmA) exact_|| 100
    >
    > will make sure that this product will be shuffled into exactly 100
    > partitions.
    >
    > Readjusting parallelism also requires implicit optimizer checkpoint, e.g.
    > the latter would be equivalent to
    >
    > val drmAtA = (drmA.t %*% drmA).checkpoint() exact_|| 100
    >
    > Repartitioning does not necessarily invoke shuffle task (e.g. if a
    > partition can be just split into two, it can be run as a map-only thing).
    >
    > This patch also does not contain optimizations causing potentially
    > changing parallelism of any previous operations in the physical pipeline.
    > This is something left todo, something to think about.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/mahout/pull/13#issuecomment-46358010>.
    >


> More explicit parallelism adjustments in math-scala DRM apis; elements of 
> automatic re-adjustments
> --------------------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1573
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1573
>             Project: Mahout
>          Issue Type: Task
>    Affects Versions: 0.9
>            Reporter: Dmitriy Lyubimov
>            Assignee: Dmitriy Lyubimov
>             Fix For: 1.0
>
>
> (1) add minSplit parameter pass-thru to drmFromHDFS to be able to explicitly 
> increase parallelism. 
> (2) add parrallelism readjustment parameter to a checkpoint() call. This 
> implies shuffle-less coalesce() translation to the data set before it is 
> requested to be cached (if specified).
> Going forward, we probably should try and figure how we can automate it,  at 
> least a little bit. For example, the simplest automatic adjustment might 
> include re-adjust parallelims on load to simply fit cluster size (95% or 180% 
> of cluster size, for example), with some rule-of-thumb safeguards here, e.g. 
> we cannot exceed a factor of say 8 (or whatever we configure) in splitting 
> each original hdfs split. We should be able to get a reasonable parallelism 
> performance out of the box on simple heuristics like that.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to