[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-04-11 Thread Hudson (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234743#comment-15234743
 ] 

Hudson commented on MAHOUT-1810:


FAILURE: Integrated in Mahout-Quality #3324 (See 
[https://builds.apache.org/job/Mahout-Quality/3324/])
MAHOUT-1810: Failing test in flink-bindings: A + B Identically (apalumbo: rev 
f4f42ae4c4c7555659edcc43669fec82f9537219)
* flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
* 
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
* 
flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
MAHOUT-1810: Use method taken from FlinkMLTools for CheckpointedFlinkDrm 
(apalumbo: rev 202b94f840286d4d0970f0427122697ba27fc1fb)
* flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
* flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
* 
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>  Components: Flink
>Affects Versions: 0.11.2
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205682#comment-15205682
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


Github user andrewpalumbo closed the pull request at:

https://github.com/apache/mahout/pull/198


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>  Components: Flink
>Affects Versions: 0.11.2
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205620#comment-15205620
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


Github user smarthi commented on the pull request:

https://github.com/apache/mahout/pull/198#issuecomment-199583714
  
Yeah, please commit this and start another Jira. This is a big win.


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205516#comment-15205516
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


Github user andrewpalumbo commented on the pull request:

https://github.com/apache/mahout/pull/198#issuecomment-199569347
  
I suppose that I could just commit this and start another JIRA for the 
caching issue.


> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205368#comment-15205368
 ] 

ASF GitHub Bot commented on MAHOUT-1810:


GitHub user andrewpalumbo opened a pull request:

https://github.com/apache/mahout/pull/198

MAHOUT-1810 persist data to file system and read back at each cache() call 
(FOR REVIEW)

We still need to investigate Flink's 
`ExecutionEnvironment.registerCachedFile` to really cache things for access at  
back the end.  However tests do pass with this.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1810

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/mahout/pull/198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #198


commit a9ca8d396eb3963a8e344da1e4e94024af4b437e
Author: Andrew Palumbo 
Date:   2016-03-21T23:15:32Z

Write data to file system and read back at each cache() call




> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-20 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200151#comment-15200151
 ] 

Andrew Palumbo commented on MAHOUT-1810:


I am not sure about how the Flink engine handles the caching of intermediate 
results.



> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-19 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15199864#comment-15199864
 ] 

Dmitriy Lyubimov commented on MAHOUT-1810:
--

Assumption of identical partitioning depends on the engine. Maybe it doesn't 
hold in case of flink at all?

In this case (checkpoint or not) the assumption is that collection.map(x=>x) 
doesn't change neither data allocation to splits nor its ordering inside every 
split (aka partition). If this holds, then input and output are identically 
partitioned. 

Therefore, if B = A.map(x=> x...) then A and B are identically partitioned, and 
then A + B can be optimized as A.zip(B).map (_._1 + _._2). If A and B are not 
identically partitioned, then elementwise binary functions would require 
pre-join, which is much more expensive than zip. 

This test simply provokes this optimization (in spark), but if engine doesn't 
support zips or assumption of identical partitioning does not hold, then engine 
optimizer should rectify the situation by always executing join() after 
mapblocks. Check back with me for more info where to hack it if it is indeed 
the case.. 


> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-19 Thread Suneel Marthi (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200153#comment-15200153
 ] 

Suneel Marthi commented on MAHOUT-1810:
---

There's a DataSetUtils which has methhods for zipping Datasets.

> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201922#comment-15201922
 ] 

Andrew Palumbo commented on MAHOUT-1810:


[~dlyubimov]I am almost positive that the failure is due to checkpointing 
failing on mapBlock.  I believe that this causing the test failures on dspca 
(MAHOUT-1809), ie, the internal mapBlock call in dspca. Could you let me know 
what your thoughts are on the join fix when you get a chance?

> Failing test in flink-bindings: A + B Identically partitioned
> -
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15198693#comment-15198693
 ] 

Andrew Palumbo commented on MAHOUT-1810:


Flink Checkpointing seems to be working fine on other computational paths 
(other then MapBlock).
{code}
 test("Checkpoint test") {

val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
val inCoreB = inCoreA.like := { (r, c, v) => util.Random.nextDouble()}

val A = drmParallelize(inCoreA, numPartitions = 2)
val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()

val C = (B %*% A.t)
val D = (B %*% A.t)


val inCoreC = C.collect
val inCoreD = D.collect

println(inCoreC)
(inCoreC - inCoreD).norm should be < 1E-10

  }
{code}
passes.

> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200163#comment-15200163
 ] 

Andrew Palumbo commented on MAHOUT-1810:


>From the above tests though we can see that in the case of A%*%B where B is a 
>checkpointed non-deterministic matrix, Checkpointing is creating a barrier.  I 
>should have illustrated this better by Using A+B and also providing failure 
>case. (I was also using it as a sanity test for FlinkOpAtB)  But it does in 
>fact fail without the checkpoint of B.

It seems that checkpointing the mapBlock result is the culprit.  Maybe this 
speaks to your point about intermediate caching.

> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15202159#comment-15202159
 ] 

Andrew Palumbo commented on MAHOUT-1810:


As far as I know there is no currently no Caching in the same sense as is done 
in Spark.  I've been told this is to come in future releases.

I am just getting familiar with the Flink engine.  I wonder if we can use the 
{{DistributedFileCache}}?

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/cache/DistributedCache.DistributedCacheEntry.html

> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200134#comment-15200134
 ] 

Andrew Palumbo commented on MAHOUT-1810:


Thanks, I will check back in with you on the mapBlock join fix when I'm more 
focused on this issue.  I have MAHOUT-1815 more in my head right now, but I am 
suspecting that Flink partitioning has something to do with all of our major 
bugs.
 
  I don't believe that the Flink engine supports zips for DataSets. In the case 
of Flink AewB a join is always done via {{DataSet.coGroup}}:

{code}
val rowsA = A.asRowWise.ds
val rowsB = B.asRowWise.ds
implicit val kTag = op.keyClassTag

val res: DataSet[(K, Vector)] =
  rowsA
.coGroup(rowsB)
.where(0)
.equalTo(0) {
(left, right, out: Collector[(K, Vector)]) =>
  (left.toIterable.headOption, right.toIterable.headOption) match {
case (Some((idx, a)), Some((_, b))) => out.collect((idx, 
function(a, b)))
case (None, Some(b)) => out.collect(b)
case (Some(a), None) => out.collect(a)
case (None, None) => throw new RuntimeException("At least one side 
of the co group " +
  "must be non-empty.")
  }
  }
{code}

> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A %*% B Identically partitioned

2016-03-19 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15199914#comment-15199914
 ] 

Dmitriy Lyubimov commented on MAHOUT-1810:
--

Another note is that checkpoint() (at least in case of spark) would not prevent 
computation non-determinism in case a partition is lost and subsequently 
recomputed. 
it _may_ have effect on double executions if engine indeed recomputes the input 
A again as part of A+B so maybe yes checkpoint is not doing what it is supposed 
to do for Flink, i.e., does not create optimization barrier here or/and does 
not cache intermediate result by default.

> Failing test in flink-bindings: A %*% B Identically partitioned
> ---
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A %*% B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)

2016-03-19 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15202129#comment-15202129
 ] 

Dmitriy Lyubimov commented on MAHOUT-1810:
--

so if checkpoint doesn't cache, (which is the intent to get rid of determinism 
in this test), it is formal non-adherence to the contract of checkpoint and 
checkpoint caching capabilities (parameters CacheHint).

So you are saying there's no way to cope with this?

I think, in the worst case, the solution should seek dumping intermediate 
checkpoint (for cache hints other than None) to dfs or in-memory file system. 
various people are telling me that dfs can now have a pretty sizeable local 
cache configured too, so persistence is not so bad (but not as good as keeping 
object trees in the same jvm, of course).

> Failing test in flink-bindings: A + B Identically partitioned (mapBlock 
> Checkpointing issue)
> 
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned

2016-03-19 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201713#comment-15201713
 ] 

Andrew Palumbo commented on MAHOUT-1810:


Flink does cache intermediate results of a task by default:

   https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html

However, caching of a DRM to memory is not possible and the call to 
{{CheckpointedFlinkDrm.cache(...)}} is only stubbed out.

However this is the same situation in H2O and this test passes without issue 
there.


> Failing test in flink-bindings: A + B Identically partitioned
> -
>
> Key: MAHOUT-1810
> URL: https://issues.apache.org/jira/browse/MAHOUT-1810
> Project: Mahout
>  Issue Type: Bug
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> the {{A + B, Identically Partitioned}} test in the Flink RLikeDrmOpsSuite 
> fails.  This test failure likely indicates an issue with Flink's 
> Checkpointing or mapBlock operator:
> {code}
>   test("C = A + B, identically partitioned") {
> val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
> val A = drmParallelize(inCoreA, numPartitions = 2)
> // Create B which would be identically partitioned to A. mapBlock() by 
> default will do the trick.
> val B = A.mapBlock() {
>   case (keys, block) =>
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> keys -> bBlock
> }
>   // Prevent repeated computation non-determinism
>   // flink problem is here... checkpoint is not doing what it should
>   .checkpoint()
> val inCoreB = B.collect
> printf("A=\n%s\n", inCoreA)
> printf("B=\n%s\n", inCoreB)
> val C = A + B
> val inCoreC = C.collect
> printf("C=\n%s\n", inCoreC)
> // Actual
> val inCoreCControl = inCoreA + inCoreB
> (inCoreC - inCoreCControl).norm should be < 1E-10
>   }
> {code}
> The output shous clearly that the line:
> {code}
> val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
> {code} 
> in the {{mapBlock}} closure is being calculated more than once.
> Output:
> {code}
> A=
> {
>  0 => {0:1.0,1:2.0,2:3.0}
>  1 => {0:3.0,1:4.0,2:5.0}
>  2 => {0:5.0,1:6.0,2:7.0}
> }
> B=
> {
>  0 => {0:0.26203398262809574,1:0.22561543461472167,2:0.23229669514522655}
>  1 => {0:0.1638068194515867,1:0.18751822418846575,2:0.20586366231381614}
>  2 => {0:0.9279465706239354,1:0.2963513448240057,2:0.8866928923235948}
> }
> C=
> {
>  0 => {0:1.7883652623225594,1:2.6401297718606216,2:3.0023341959374195}
>  1 => {0:3.641411452208408,1:4.941233165480053,2:5.381282338548803}
>  2 => {0:5.707434148862531,1:6.022780876943659,2:7.149772825494352}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)