I think that the NPE in second condition is bug in HashTable.
I just found that ConnectedComponents with small memory segments causes same 
error. (I thought I fixed the bug, but It is still alive.)

Regards,
Chiwan Park
 
> On Jun 5, 2015, at 2:35 AM, Felix Neutatz <neut...@googlemail.com> wrote:
> 
> now the question is, which join in the ALS implementation is the problem :)
> 
> 2015-06-04 19:09 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> 
>> Hi Felix,
>> 
>> Passing a JoinHint to your function should help.
>> see:
>> 
>> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E
>> 
>> Cheers,
>> Andra
>> 
>> On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <neut...@googlemail.com>
>> wrote:
>> 
>>> after bug fix:
>>> 
>>> for 100 blocks and standard jvm heap space
>>> 
>>> Caused by: java.lang.RuntimeException: Hash join exceeded maximum number
>> of
>>> recursions, without reducing partitions enough to be memory resident.
>>> Probably cause: Too many duplicate keys.
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>> at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> 
>>> 
>>> for 150 blocks and 5G jvm heap space
>>> 
>>> Caused by: java.lang.NullPointerException
>>> at
>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>>> ...
>>> 
>>> Best regards,
>>> Felix
>>> 
>>> 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:
>>> 
>>>> Yes, I will try it again with the newest update :)
>>>> 
>>>> 2015-06-04 10:17 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
>>>> 
>>>>> If the first error is not fixed by Chiwans PR, then we should create a
>>>>> JIRA
>>>>> for it to not forget it.
>>>>> 
>>>>> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
>>> this
>>>>> version?
>>>>> 
>>>>> Cheers,
>>>>> Till
>>>>> 
>>>>> [1] https://github.com/apache/flink/pull/751
>>>>> 
>>>>> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <chiwanp...@icloud.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi. The second bug is fixed by the recent change in PR.
>>>>>> But there is just no test case for first bug.
>>>>>> 
>>>>>> Regards,
>>>>>> Chiwan Park
>>>>>> 
>>>>>>> On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>>>>> 
>>>>>>> I think both are bugs. They are triggered by the different memory
>>>>>>> configurations.
>>>>>>> 
>>>>>>> @chiwan: is the 2nd error fixed by your recent change?
>>>>>>> 
>>>>>>> @felix: if yes, can you try the 2nd run again with the changes?
>>>>>>> 
>>>>>>> On Thursday, June 4, 2015, Felix Neutatz <neut...@googlemail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> I played a bit with the ALS recommender algorithm. I used the
>>>>> movielens
>>>>>>>> dataset:
>>>>>>>> 
>>> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>>>>>>>> 
>>>>>>>> The rating matrix has 21.063.128 entries (ratings).
>>>>>>>> 
>>>>>>>> I run the algorithm with 3 configurations:
>>>>>>>> 
>>>>>>>> 1. standard jvm heap space:
>>>>>>>> 
>>>>>>>> val als = ALS()
>>>>>>>>  .setIterations(10)
>>>>>>>>  .setNumFactors(10)
>>>>>>>>  .setBlocks(100)
>>>>>>>> 
>>>>>>>> throws:
>>>>>>>> java.lang.RuntimeException: Hash Join bug in memory management:
>>>>> Memory
>>>>>>>> buffers leaked.
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>>>>>>> at
>>>>>> 
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> 
>>>>>>>> 2. 5G jvm heap space
>>>>>>>> 
>>>>>>>> val als = ALS()
>>>>>>>>  .setIterations(10)
>>>>>>>>  .setNumFactors(10)
>>>>>>>>  .setBlocks(150)
>>>>>>>> 
>>>>>>>> throws:
>>>>>>>> 
>>>>>>>> java.lang.NullPointerException
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>>>>>>> at
>>>>>> 
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>>>>>>> at
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> 
>>>>>>>> 3. 14G jvm heap space
>>>>>>>> 
>>>>>>>> val als = ALS()
>>>>>>>>  .setIterations(10)
>>>>>>>>  .setNumFactors(10)
>>>>>>>>  .setBlocks(150)
>>>>>>>>  .setTemporaryPath("/tmp/tmpALS")
>>>>>>>> 
>>>>>>>> -> works
>>>>>>>> 
>>>>>>>> Is this a Flink problem or is it just my bad configuration?
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> Felix
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>> 



Reply via email to