Repository: mahout Updated Branches: refs/heads/master c397ef7f7 -> 08e02602e
MAHOUT-1971 Aggregate Transpose Bug closes apache/mahout#307 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/08e02602 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/08e02602 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/08e02602 Branch: refs/heads/master Commit: 08e02602e947ff945b9bd73ab5f0b45863df3e53 Parents: c397ef7 Author: rawkintrevo <[email protected]> Authored: Sat Apr 22 13:58:17 2017 -0500 Committer: rawkintrevo <[email protected]> Committed: Sat Apr 22 13:58:17 2017 -0500 ---------------------------------------------------------------------- .../mahout/flinkbindings/blas/FlinkOpAt.scala | 2 +- .../mahout/flinkbindings/DrmLikeOpsSuite.scala | 20 ++++++++++++++++++++ .../apache/mahout/sparkbindings/blas/At.scala | 2 +- .../mahout/sparkbindings/drm/DrmLikeSuite.scala | 20 ++++++++++++++++++++ 4 files changed, 42 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala index 45214e5..5093216 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala @@ -49,7 +49,7 @@ object FlinkOpAt { val columnVector: Vector = new SequentialAccessSparseVector(ncol) keys.zipWithIndex.foreach { - case (key, idx) => columnVector(key) = block(idx, columnIndex) + case (key, idx) => columnVector(key) += block(idx, columnIndex) } (columnIndex, columnVector) http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala index fe2277c..288561b 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala @@ -18,6 +18,8 @@ */ package org.apache.mahout.flinkbindings +import org.apache.mahout.logging.info +import org.apache.mahout.math.DenseMatrix import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.scalabindings.RLikeOps._ @@ -70,4 +72,22 @@ class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { (emptyDrm.collect - expected).norm should be < 1e-6 } + test("Aggregating transpose") { + + val mxA = new DenseMatrix(20, 10) := 1 + + val drmA = drmParallelize(mxA, numPartitions = 3) + + val reassignedA = drmA.mapBlock() { case (keys, block) â + keys.map(_ % 3) â block + } + + val mxAggrA = reassignedA.t(::, 0 until 3).collect + + info(mxAggrA.toString) + + mxAggrA(0,0) shouldBe 7 + mxAggrA(0,1) shouldBe 7 + mxAggrA(0,2) shouldBe 6 + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala index fa25b73..b8e6025 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala @@ -62,7 +62,7 @@ object At { // Compute sparse vector. This should be quick if we assign values siquentially. val colV: Vector = new SequentialAccessSparseVector(ncol) keys.view.zipWithIndex.foreach({ - case (row, blockRow) => colV(row) = blockA(blockRow, blockCol) + case (row, blockRow) => colV(row) += blockA(blockRow, blockCol) }) blockCol -> colV http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala index e88e7ef..bc6ee72 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala @@ -23,6 +23,7 @@ import scalabindings._ import drm._ import RLikeOps._ import RLikeDrmOps._ +import org.apache.mahout.logging.info import org.apache.mahout.sparkbindings._ import org.apache.mahout.sparkbindings.test.DistributedSparkSuite @@ -139,4 +140,23 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite assert(dfM === testM) } + + test("Aggregating transpose") { + + val mxA = new DenseMatrix(20, 10) := 1 + + val drmA = drmParallelize(mxA, numPartitions = 3) + + val reassignedA = drmA.mapBlock() { case (keys, block) â + keys.map(_ % 3) â block + } + + val mxAggrA = reassignedA.t(::, 0 until 3).collect + + info(mxAggrA.toString) + + mxAggrA(0,0) shouldBe 7 + mxAggrA(0,1) shouldBe 7 + mxAggrA(0,2) shouldBe 6 + } }
