[ https://issues.apache.org/jira/browse/MAHOUT-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14582717#comment-14582717 ]
ASF GitHub Bot commented on MAHOUT-1660: ---------------------------------------- Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/135#discussion_r32281350 --- Diff: spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala --- @@ -1,50 +1,58 @@ package org.apache.mahout.sparkbindings.blas +import org.apache.mahout.sparkbindings.drm + import scala.reflect.ClassTag import org.apache.mahout.sparkbindings.drm.DrmRddInput import org.apache.mahout.math.drm.logical.OpPar import org.apache.spark.rdd.RDD +import scala.math._ + +import org.apache.mahout.logging._ /** Physical adjustment of parallelism */ object Par { + private final implicit val log = getLog(Par.getClass) + def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = { - def adjust[T](rdd: RDD[T]): RDD[T] = - if (op.minSplits > 0) { - if (rdd.partitions.size < op.minSplits) - rdd.coalesce(op.minSplits, shuffle = true) - else rdd.coalesce(rdd.partitions.size) - } else if (op.exactSplits > 0) { - if (op.exactSplits < rdd.partitions.size) - rdd.coalesce(numPartitions = op.exactSplits, shuffle = false) - else if (op.exactSplits > rdd.partitions.size) - rdd.coalesce(numPartitions = op.exactSplits, shuffle = true) - else - rdd.coalesce(rdd.partitions.size) - } else if (op.exactSplits == -1 && op.minSplits == -1) { - - // auto adjustment, try to scale up to either x1Size or x2Size. - val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt - - val x1Size = (clusterSize * .95).ceil.toInt - val x2Size = (clusterSize * 1.9).ceil.toInt - - if (rdd.partitions.size <= x1Size) - rdd.coalesce(numPartitions = x1Size, shuffle = true) - else if (rdd.partitions.size <= x2Size) - rdd.coalesce(numPartitions = x2Size, shuffle = true) - else - rdd.coalesce(numPartitions = rdd.partitions.size) - } else rdd.coalesce(rdd.partitions.size) - - if (src.isBlockified) { - val rdd = src.toBlockifiedDrmRdd() - new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd))) + val srcBlockified = src.isBlockified + + val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd() + val srcNParts = srcRdd.partitions.size + + // To what size? + val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits --- End diff -- this is scala style yes. they decided to do away with inline ?: (bincond) > Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop conf > ---------------------------------------------------------- > > Key: MAHOUT-1660 > URL: https://issues.apache.org/jira/browse/MAHOUT-1660 > Project: Mahout > Issue Type: Bug > Components: spark > Affects Versions: 0.10.0 > Reporter: Suneel Marthi > Assignee: Dmitriy Lyubimov > Priority: Minor > Fix For: 0.10.2 > > > Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop configuration from > Context and not ignore it -- This message was sent by Atlassian JIRA (v6.3.4#6332)