MAHOUT-1597: A + 1.0 (fixes)
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/7a50a291 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/7a50a291 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/7a50a291 Branch: refs/heads/spark-1.0.x Commit: 7a50a291b4598e9809f9acf609b92175ce7f953b Parents: e5bc885 Author: Dmitriy Lyubimov <[email protected]> Authored: Wed Aug 6 12:30:51 2014 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Wed Aug 6 12:33:03 2014 -0700 ---------------------------------------------------------------------- .../org/apache/mahout/math/drm/logical/OpAewScalar.scala | 6 +++++- .../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala | 2 +- .../org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala | 8 ++++++++ 3 files changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/7a50a291/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala index 3b651f6..19a910c 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala @@ -19,6 +19,7 @@ package org.apache.mahout.math.drm.logical import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike +import scala.util.Random /** Operator denoting expressions like 5.0 - A or A * 5.6 */ case class OpAewScalar[K: ClassTag]( @@ -27,7 +28,10 @@ case class OpAewScalar[K: ClassTag]( val op: String ) extends AbstractUnaryOp[K,K] { - override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag + override protected[mahout] lazy val partitioningTag: Long = + if (A.canHaveMissingRows) + Random.nextLong() + else A.partitioningTag /** Stuff like `A +1` is always supposed to fix this */ override protected[mahout] lazy val canHaveMissingRows: Boolean = false http://git-wip-us.apache.org/repos/asf/mahout/blob/7a50a291/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 03050bb..1c5546b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -180,7 +180,7 @@ class CheckpointedDrmSpark[K: ClassTag]( val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L val rowCount = rdd.count() _canHaveMissingRows = maxPlus1 != rowCount || - rdd.map(_._1).sum().toLong != ((rowCount -1.0 ) * (rowCount -2.0) /2.0).toLong + rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong intFixExtra = (maxPlus1 - rowCount) max 0L maxPlus1 } else http://git-wip-us.apache.org/repos/asf/mahout/blob/7a50a291/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala index c47f7f1..a5cb7f8 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala @@ -28,6 +28,14 @@ import org.apache.mahout.sparkbindings.test.DistributedSparkSuite /** DRMLike tests -- just run common DRM tests in Spark. */ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuiteBase { + + test("drmParallellize produces drm with no missing rows") { + val inCoreA = dense((1, 2, 3), (3, 4, 5)) + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + drmA.canHaveMissingRows shouldBe false + } + test("DRM blockify dense") { val inCoreA = dense((1, 2, 3), (3, 4, 5))
