[ https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15198693#comment-15198693 ]
Andrew Palumbo edited comment on MAHOUT-1810 at 3/18/16 4:07 PM: ----------------------------------------------------------------- -Flink Checkpointing seems to be working fine on other computational paths (other then MapBlock).- {code} test("Checkpoint test") { val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) val inCoreB = inCoreA.like := { (r, c, v) => util.Random.nextDouble()} val A = drmParallelize(inCoreA, numPartitions = 2) val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() val C = (B %*% A.t) val D = (B %*% A.t) val inCoreC = C.collect val inCoreD = D.collect println(inCoreC) (inCoreC - inCoreD).norm should be < 1E-10 } {code} passes. *Edit note*: This does pass however It is not failing for me now when i remove the {{checkpoint}}- it may have something to do with the re-keying fix for MAHOUT-1815. Or may have only been failing (without the {{checkpoint}}) on another branch, while re-working {{FlinkOpAtB}}). Bottom line this test does not provide much information and passes with or without the checkpoint at: {code} val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() {code} was (Author: andrew_palumbo): Flink Checkpointing seems to be working fine on other computational paths (other then MapBlock). {code} test("Checkpoint test") { val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) val inCoreB = inCoreA.like := { (r, c, v) => util.Random.nextDouble()} val A = drmParallelize(inCoreA, numPartitions = 2) val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() val C = (B %*% A.t) val D = (B %*% A.t) val inCoreC = C.collect val inCoreD = D.collect println(inCoreC) (inCoreC - inCoreD).norm should be < 1E-10 } {code} passes. > Failing test in flink-bindings: A + B Identically partitioned > ------------------------------------------------------------- > > Key: MAHOUT-1810 > URL: https://issues.apache.org/jira/browse/MAHOUT-1810 > Project: Mahout > Issue Type: Bug > Reporter: Andrew Palumbo > Assignee: Andrew Palumbo > Priority: Blocker > Fix For: 0.12.0 > > > the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite > fails. This test failure likely indicates an issue with Flink's > Checkpointing or mapBlock operator: > {code} > test("C = A + B, identically partitioned") { > val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7)) > val A = drmParallelize(inCoreA, numPartitions = 2) > // Create B which would be identically partitioned to A. mapBlock() by > default will do the trick. > val B = A.mapBlock() { > case (keys, block) => > val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} > keys -> bBlock > } > // Prevent repeated computation non-determinism > // flink problem is here... checkpoint is not doing what it should > .checkpoint() > val inCoreB = B.collect > printf("A=\n%s\n", inCoreA) > printf("B=\n%s\n", inCoreB) > val C = A + B > val inCoreC = C.collect > printf("C=\n%s\n", inCoreC) > // Actual > val inCoreCControl = inCoreA + inCoreB > (inCoreC - inCoreCControl).norm should be < 1E-10 > } > {code} > The output shous clearly that the line: > {code} > val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} > {code} > in the {{mapBlock}} closure is being calculated more than once. > Output: > {code} > A= > { > 0 => {0:1.0,1:2.0,2:3.0} > 1 => {0:3.0,1:4.0,2:5.0} > 2 => {0:5.0,1:6.0,2:7.0} > } > B= > { > 0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655} > 1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614} > 2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948} > } > C= > { > 0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195} > 1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803} > 2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)