Shouldn't Flink figure it out on its own, how much memory there is for the
join?

The detailed trace for the Nullpointer exception can be found here:
https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt

Best regards,
Felix

2015-06-04 19:41 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:

> I think it is not a problem of join hints, but rather of too little memory
> for the join operator. If you set the temporary directory, then the job
> will be split in smaller parts and thus each operator gets more memory.
> Alternatively, you can increase the memory you give to the Task Managers.
>
> The problem with the NullPointerException won't be solved by this, though.
> Could you send the full stack trace for that?
>
> Cheers,
> Till
> On Jun 4, 2015 7:10 PM, "Andra Lungu" <lungu.an...@gmail.com> wrote:
>
> > Hi Felix,
> >
> > Passing a JoinHint to your function should help.
> > see:
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E
> >
> > Cheers,
> > Andra
> >
> > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <neut...@googlemail.com>
> > wrote:
> >
> > > after bug fix:
> > >
> > > for 100 blocks and standard jvm heap space
> > >
> > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> number
> > of
> > > recursions, without reducing partitions enough to be memory resident.
> > > Probably cause: Too many duplicate keys.
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > for 150 blocks and 5G jvm heap space
> > >
> > > Caused by: java.lang.NullPointerException
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > ...
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:
> > >
> > > > Yes, I will try it again with the newest update :)
> > > >
> > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
> > > >
> > > >> If the first error is not fixed by Chiwans PR, then we should
> create a
> > > >> JIRA
> > > >> for it to not forget it.
> > > >>
> > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> > > this
> > > >> version?
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> [1] https://github.com/apache/flink/pull/751
> > > >>
> > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <chiwanp...@icloud.com
> >
> > > >> wrote:
> > > >>
> > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > >> > But there is just no test case for first bug.
> > > >> >
> > > >> > Regards,
> > > >> > Chiwan Park
> > > >> >
> > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org> wrote:
> > > >> > >
> > > >> > > I think both are bugs. They are triggered by the different
> memory
> > > >> > > configurations.
> > > >> > >
> > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > >> > >
> > > >> > > @felix: if yes, can you try the 2nd run again with the changes?
> > > >> > >
> > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> neut...@googlemail.com>
> > > >> wrote:
> > > >> > >
> > > >> > >> Hi,
> > > >> > >>
> > > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > > >> movielens
> > > >> > >> dataset:
> > > >> > >>
> > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > >> > >>
> > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > >> > >>
> > > >> > >> I run the algorithm with 3 configurations:
> > > >> > >>
> > > >> > >> 1. standard jvm heap space:
> > > >> > >>
> > > >> > >> val als = ALS()
> > > >> > >>   .setIterations(10)
> > > >> > >>   .setNumFactors(10)
> > > >> > >>   .setBlocks(100)
> > > >> > >>
> > > >> > >> throws:
> > > >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> > > >> Memory
> > > >> > >> buffers leaked.
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > >> > >> at
> > > >> >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > >> > >>
> > > >> > >> 2. 5G jvm heap space
> > > >> > >>
> > > >> > >> val als = ALS()
> > > >> > >>   .setIterations(10)
> > > >> > >>   .setNumFactors(10)
> > > >> > >>   .setBlocks(150)
> > > >> > >>
> > > >> > >> throws:
> > > >> > >>
> > > >> > >> java.lang.NullPointerException
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > >> > >> at
> > > >> >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > >> > >>
> > > >> > >> 3. 14G jvm heap space
> > > >> > >>
> > > >> > >> val als = ALS()
> > > >> > >>   .setIterations(10)
> > > >> > >>   .setNumFactors(10)
> > > >> > >>   .setBlocks(150)
> > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > >> > >>
> > > >> > >> -> works
> > > >> > >>
> > > >> > >> Is this a Flink problem or is it just my bad configuration?
> > > >> > >>
> > > >> > >> Best regards,
> > > >> > >> Felix
> > > >> > >>
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to