[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200163#comment-15200163
 ] 

Andrew Palumbo edited comment on MAHOUT-1810 at 3/18/16 4:21 PM:
-----------------------------------------------------------------

-From the above tests though we can see that in the case of {{B %*% A.t}} where 
{{B}} is a checkpointed non-deterministic matrix, Checkpointing is creating a 
barrier.  I should have illustrated this better by Using A+B and also providing 
failure case. (I was also using it as a sanity test for FlinkOpAtB)  But it 
does in fact fail without the checkpoint of {{B}}-.

*Edit Note*: Same as above.

It seems that checkpointing the mapBlock result is the culprit.  Maybe this 
speaks to your point about intermediate caching.


was (Author: andrew_palumbo):
>From the above tests though we can see that in the case of {{B %*% A.t}} where 
>{{B}} is a checkpointed non-deterministic matrix, Checkpointing is creating a 
>barrier.  I should have illustrated this better by Using A+B and also 
>providing failure case. (I was also using it as a sanity test for FlinkOpAtB)  
>But it does in fact fail without the checkpoint of {{B}}.

It seems that checkpointing the mapBlock result is the culprit.  Maybe this 
speaks to your point about intermediate caching.

> Failing test in flink-bindings: A + B Identically partitioned
> -------------------------------------------------------------
>
>                 Key: MAHOUT-1810
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1810
>             Project: Mahout
>          Issue Type: Bug
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>            Priority: Blocker
>             Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
>     val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
>     val A = drmParallelize(inCoreA, numPartitions = 2)
>     // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
>     val B = A.mapBlock() {
>       case (keys, block) =>
>         val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
>         keys -> bBlock
>     }
>       // Prevent repeated computation non-determinism
>       // flink problem is here... checkpoint is not doing what it should
>       .checkpoint()
>     val inCoreB = B.collect
>     printf("A=\n%s\n", inCoreA)
>     printf("B=\n%s\n", inCoreB)
>     val C = A + B
>     val inCoreC = C.collect
>     printf("C=\n%s\n", inCoreC)
>     // Actual
>     val inCoreCControl = inCoreA + inCoreB
>     (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
>         val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to