[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200163#comment-15200163
 ] 

Andrew Palumbo edited comment on MAHOUT-1810 at 3/17/16 7:57 PM:
-----------------------------------------------------------------

>From the above tests though we can see that in the case of{{ B %*% A.t}} where 
>B is a checkpointed non-deterministic matrix, Checkpointing is creating a 
>barrier.  I should have illustrated this better by Using A+B and also 
>providing failure case. (I was also using it as a sanity test for FlinkOpAtB)  
>But it does in fact fail without the checkpoint of B.

It seems that checkpointing the mapBlock result is the culprit.  Maybe this 
speaks to your point about intermediate caching.


was (Author: andrew_palumbo):
>From the above tests though we can see that in the case of A%*%B where B is a 
>checkpointed non-deterministic matrix, Checkpointing is creating a barrier.  I 
>should have illustrated this better by Using A+B and also providing failure 
>case. (I was also using it as a sanity test for FlinkOpAtB)  But it does in 
>fact fail without the checkpoint of B.

It seems that checkpointing the mapBlock result is the culprit.  Maybe this 
speaks to your point about intermediate caching.

> Failing test in flink-bindings: A + B Identically partitioned
> -------------------------------------------------------------
>
>                 Key: MAHOUT-1810
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1810
>             Project: Mahout
>          Issue Type: Bug
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>            Priority: Blocker
>             Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
>     val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
>     val A = drmParallelize(inCoreA, numPartitions = 2)
>     // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
>     val B = A.mapBlock() {
>       case (keys, block) =>
>         val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
>         keys -> bBlock
>     }
>       // Prevent repeated computation non-determinism
>       // flink problem is here... checkpoint is not doing what it should
>       .checkpoint()
>     val inCoreB = B.collect
>     printf("A=\n%s\n", inCoreA)
>     printf("B=\n%s\n", inCoreB)
>     val C = A + B
>     val inCoreC = C.collect
>     printf("C=\n%s\n", inCoreC)
>     // Actual
>     val inCoreCControl = inCoreA + inCoreB
>     (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
>         val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to