[
https://issues.apache.org/jira/browse/MAHOUT-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14034291#comment-14034291
]
ASF GitHub Bot commented on MAHOUT-1573:
----------------------------------------
Github user dlyubimov commented on the pull request:
https://github.com/apache/mahout/pull/13#issuecomment-46358010
@Sebastian, you wanted this, can you take a look oif this serves the
purpose?
@tdunning, spark allows finer splits (minSplits parameter) on hdfs loading
and on intermediate products. In our case, this patch adds minSplits parameter
to fromHDFS() method to allow to specify finer-than-iniitial HDFS load
parallelism explicitly.
In addition to that, parallelism can also be re-adjusted explicitly or
automatically using the operators i mentioned.
Automatic parallellism adjustment uses current parallelism and spark
cluster default parallelisms as guidelines (i.e. it assumes tasks are already
of somewhat reasonable size, per above).
i.g. loading matrix from hdfs
val a = drmFromHDFS(path, minsplits=100)
to make sure number of partitions (i.e. map tasks) is at least 100 on
matrix load.
This mainly has to do with the fact that algebraic flops often grow
asymptotically faster than the input size, so in some cases default cluster
size is the best guideline in terms of load balancing.
(in fact, since in Spark tasks are super cheap to run, i think it is ok to
split 400%, 500% of default parallelism to achieve more even load).
Optimizer computes ("predicts") parallelism of shuffles based on existing
parallelism of products, but after a long chain of operations this predictions
may deteriorate (or just not work well for whatever reason). in this case we
may readjust it explicitly at any checkpoint of expression:
val drmAtA = (drmA.t %*% drmA) min_|| 100
will make sure that this product will be shuffled into at least 100
partitions. Analogously,
val drmAtA = (drmA.t %*% drmA) exact_|| 100
will make sure that this product will be shuffled into exactly 100
partitions.
Readjusting parallelism also requires implicit optimizer checkpoint, e.g.
the latter would be equivalent to
val drmAtA = (drmA.t %*% drmA).checkpoint() exact_|| 100
Repartitioning does not necessarily invoke shuffle task (e.g. if a
partition can be just split into two, it can be run as a map-only thing).
This patch also does not contain optimizations causing potentially changing
parallelism of any previous operations in the physical pipeline. This is
something left todo, something to think about.
> More explicit parallelism adjustments in math-scala DRM apis; elements of
> automatic re-adjustments
> --------------------------------------------------------------------------------------------------
>
> Key: MAHOUT-1573
> URL: https://issues.apache.org/jira/browse/MAHOUT-1573
> Project: Mahout
> Issue Type: Task
> Affects Versions: 0.9
> Reporter: Dmitriy Lyubimov
> Assignee: Dmitriy Lyubimov
> Fix For: 1.0
>
>
> (1) add minSplit parameter pass-thru to drmFromHDFS to be able to explicitly
> increase parallelism.
> (2) add parrallelism readjustment parameter to a checkpoint() call. This
> implies shuffle-less coalesce() translation to the data set before it is
> requested to be cached (if specified).
> Going forward, we probably should try and figure how we can automate it, at
> least a little bit. For example, the simplest automatic adjustment might
> include re-adjust parallelims on load to simply fit cluster size (95% or 180%
> of cluster size, for example), with some rule-of-thumb safeguards here, e.g.
> we cannot exceed a factor of say 8 (or whatever we configure) in splitting
> each original hdfs split. We should be able to get a reasonable parallelism
> performance out of the box on simple heuristics like that.
--
This message was sent by Atlassian JIRA
(v6.2#6252)