I think both are bugs. They are triggered by the different memory configurations.
@chiwan: is the 2nd error fixed by your recent change? @felix: if yes, can you try the 2nd run again with the changes? On Thursday, June 4, 2015, Felix Neutatz <neut...@googlemail.com> wrote: > Hi, > > I played a bit with the ALS recommender algorithm. I used the movielens > dataset: > http://files.grouplens.org/datasets/movielens/ml-latest-README.html > > The rating matrix has 21.063.128 entries (ratings). > > I run the algorithm with 3 configurations: > > 1. standard jvm heap space: > > val als = ALS() > .setIterations(10) > .setNumFactors(10) > .setBlocks(100) > > throws: > java.lang.RuntimeException: Hash Join bug in memory management: Memory > buffers leaked. > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > at > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > 2. 5G jvm heap space > > val als = ALS() > .setIterations(10) > .setNumFactors(10) > .setBlocks(150) > > throws: > > java.lang.NullPointerException > at > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > at > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > 3. 14G jvm heap space > > val als = ALS() > .setIterations(10) > .setNumFactors(10) > .setBlocks(150) > .setTemporaryPath("/tmp/tmpALS") > > -> works > > Is this a Flink problem or is it just my bad configuration? > > Best regards, > Felix >