[ 
https://issues.apache.org/jira/browse/MAHOUT-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201863#comment-15201863
 ] 

Andrew Palumbo edited comment on MAHOUT-1809 at 3/18/16 6:09 PM:
-----------------------------------------------------------------

{code}
test("dspca") {

    val rnd = RandomUtils.getRandom

    // Number of points
    val m = 500
    // Length of actual spectrum
    val spectrumLen = 40

    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 
1e-3))
    printf("spectrum:%s\n", spectrum)

    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))

    // PCA Rotation matrix -- should also be orthonormal.
    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, 
rnd.nextInt) - 10.0)

    val input = (u %*%: diagv(spectrum)) %*% tr.t
    val drmInput = drmParallelize(m = input, numPartitions = 2)

    // Calculate just first 10 principal factors and reduce dimensionality.
    // Since we assert just validity of the s-pca, not stochastic error, we 
bump p parameter to
    // ensure to zero stochastic error and assert only functional correctness 
of the method's pca-
    // specific additions.
    val k = 10

    // Calculate just first 10 principal factors and reduce dimensionality.
    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
    // Un-normalized pca data:
    drmPCA = drmPCA %*% diagv(s)

    val pca = drmPCA.checkpoint(CacheHint.NONE).collect

    // Of course, once we calculated the pca, the spectrum is going to be 
different since our originally
    // generated input was not centered. So here, we'd just brute-solve pca to 
verify
    val xi = input.colMeans()
    for (r <- 0 until input.nrow) input(r, ::) -= xi
    var (pcaControl, _, sControl) = svd(m = input)
    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)

    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))

    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 
10).norm).abs should be < 1E-5

  }

{code}

This is the base test which is currently passing on Spark and H20 and failing 
in Flink.

By checkpointing {{drmInput}} immediately upon parallelization, the test will 
pass:
{code} 
    val drmInput = drmParallelize(m = input, numPartitions = 2).checkpoint()
{code}


was (Author: andrew_palumbo):
{code}
test("dspca") {

    val rnd = RandomUtils.getRandom

    // Number of points
    val m = 500
    // Length of actual spectrum
    val spectrumLen = 40

    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 
1e-3))
    printf("spectrum:%s\n", spectrum)

    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))

    // PCA Rotation matrix -- should also be orthonormal.
    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, 
rnd.nextInt) - 10.0)

    val input = (u %*%: diagv(spectrum)) %*% tr.t
    val drmInput = drmParallelize(m = input, numPartitions = 2)

    // Calculate just first 10 principal factors and reduce dimensionality.
    // Since we assert just validity of the s-pca, not stochastic error, we 
bump p parameter to
    // ensure to zero stochastic error and assert only functional correctness 
of the method's pca-
    // specific additions.
    val k = 10

    // Calculate just first 10 principal factors and reduce dimensionality.
    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
    // Un-normalized pca data:
    drmPCA = drmPCA %*% diagv(s)

    val pca = drmPCA.checkpoint(CacheHint.NONE).collect

    // Of course, once we calculated the pca, the spectrum is going to be 
different since our originally
    // generated input was not centered. So here, we'd just brute-solve pca to 
verify
    val xi = input.colMeans()
    for (r <- 0 until input.nrow) input(r, ::) -= xi
    var (pcaControl, _, sControl) = svd(m = input)
    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)

    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))

    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 
10).norm).abs should be < 1E-5

  }

{code}

This is the base test which is currently passing on Spark and H20 and failing 
in Flink.

By checkpointing {{drmInout}} immediately upon parallelization, the test will 
pass:
{code} 
    val drmInput = drmParallelize(m = input, numPartitions = 2).checkpoint()
{code}

> Failing tests in Flink-bindings: dals and dspca
> -----------------------------------------------
>
>                 Key: MAHOUT-1809
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1809
>             Project: Mahout
>          Issue Type: Bug
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>            Priority: Blocker
>             Fix For: 0.12.0
>
>
> {{dspca}} and {{dals}} are failing in the flink distributed decomposition 
> suite with numerical and oom errors respectively:
> {{dspca}} Failure and stack trace:
> {code}
> 54.69239412917543 was not less than 1.0E-5
> ScalaTestFailureLocation: 
> org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4 at 
> (FailingTestsSuite.scala:230)
> org.scalatest.exceptions.TestFailedException: 54.69239412917543 was not less 
> than 1.0E-5
>       at 
> org.scalatest.MatchersHelper$.newTestFailedException(MatchersHelper.scala:160)
>       at 
> org.scalatest.Matchers$ShouldMethodHelper$.shouldMatcher(Matchers.scala:6231)
>       at org.scalatest.Matchers$AnyShouldWrapper.should(Matchers.scala:6265)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4.apply$mcV$sp(FailingTestsSuite.scala:230)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4.apply(FailingTestsSuite.scala:186)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite$$anonfun$4.apply(FailingTestsSuite.scala:186)
>       at 
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>       at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>       at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>       at org.scalatest.Transformer.apply(Transformer.scala:22)
>       at org.scalatest.Transformer.apply(Transformer.scala:20)
>       at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
>       at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>       at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
>       at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>       at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>       at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FailingTestsSuite.scala:48)
>       at 
> org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite.runTest(FailingTestsSuite.scala:48)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>       at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
>       at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>       at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
>       at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
>       at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
>       at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
>       at org.scalatest.Suite$class.run(Suite.scala:1424)
>       at 
> org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
>       at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
>       at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite.org$scalatest$BeforeAndAfterAllConfigMap$$super$run(FailingTestsSuite.scala:48)
>       at 
> org.scalatest.BeforeAndAfterAllConfigMap$class.liftedTree1$1(BeforeAndAfterAllConfigMap.scala:248)
>       at 
> org.scalatest.BeforeAndAfterAllConfigMap$class.run(BeforeAndAfterAllConfigMap.scala:247)
>       at 
> org.apache.mahout.flinkbindings.FailingTestsSuite.run(FailingTestsSuite.scala:48)
>       at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
>       at 
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
>       at 
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
>       at org.scalatest.tools.Runner$.run(Runner.scala:883)
>       at org.scalatest.tools.Runner.run(Runner.scala)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> {{dals}} Failure:
> {code}
> An exception or error caused a run to abort: Java heap space 
> java.lang.OutOfMemoryError: Java heap space
>       at java.util.Arrays.copyOf(Arrays.java:2271)
>       at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>       at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>       at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>       at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1893)
>       at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1874)
>       at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>       at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>       at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>       at 
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:893)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>       at 
> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>       at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to