Re: batch job OOM

2020-02-05 Thread Jingsong Li
Thanks Fanbin,

I will try to find the bug, and track it.

Best,
Jingsong Lee

On Thu, Feb 6, 2020 at 7:50 AM Fanbin Bu  wrote:

> Jingsong,
>
> I created https://issues.apache.org/jira/browse/FLINK-15928 to track the
> issue. Let me know if you need anything else to debug.
>
> Thanks,
> Fanbin
>
>
> On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise  wrote:
>
>> Hi Fanbin,
>>
>> you could use the RC1 of Flink that was created yesterday and use the
>> apache repo
>> https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/
>> .
>> Alternatively, if you build Flink locally with `mvn install`, then you
>> could use mavenLocal() in your gradle.build and feed from that.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu  wrote:
>>
>>> I can build flink 1.10 and install it on to EMR
>>> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
>>> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
>>> continue to use 1.9.0 since there is no 1.10 available?
>>>
>>> Thanks,
>>> Fanbin
>>>
>>> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:
>>>
 Hi Fanbin,

 You can install your own Flink build in AWS EMR, and it frees you from
 Emr’s release cycles

 On Thu, Jan 23, 2020 at 03:36 Jingsong Li 
 wrote:

> Fanbin,
>
> I have no idea now, can you created a JIRA to track it? You can
> describe complete SQL and some data informations.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Do you have any suggestions to debug the above mentioned
>> IndexOutOfBoundsException error?
>> Thanks,
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>> wrote:
>>
>>> I got the following error when running another job. any suggestions?
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException
>>> at
>>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>>> at
>>> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 I set the config value to be too large. After I changed it to a
 smaller number it works now!
 thanks you for the help. really appreciate it!

 Fanbin

 On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at 
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>

Re: batch job OOM

2020-02-05 Thread Fanbin Bu
Jingsong,

I created https://issues.apache.org/jira/browse/FLINK-15928 to track the
issue. Let me know if you need anything else to debug.

Thanks,
Fanbin


On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise  wrote:

> Hi Fanbin,
>
> you could use the RC1 of Flink that was created yesterday and use the
> apache repo
> https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/
> .
> Alternatively, if you build Flink locally with `mvn install`, then you
> could use mavenLocal() in your gradle.build and feed from that.
>
> Best,
>
> Arvid
>
> On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu  wrote:
>
>> I can build flink 1.10 and install it on to EMR
>> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
>> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
>> continue to use 1.9.0 since there is no 1.10 available?
>>
>> Thanks,
>> Fanbin
>>
>> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:
>>
>>> Hi Fanbin,
>>>
>>> You can install your own Flink build in AWS EMR, and it frees you from
>>> Emr’s release cycles
>>>
>>> On Thu, Jan 23, 2020 at 03:36 Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 I have no idea now, can you created a JIRA to track it? You can
 describe complete SQL and some data informations.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> Do you have any suggestions to debug the above mentioned
> IndexOutOfBoundsException error?
> Thanks,
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
> wrote:
>
>> I got the following error when running another job. any suggestions?
>>
>> Caused by: java.lang.IndexOutOfBoundsException
>> at
>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>> at
>> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> I set the config value to be too large. After I changed it to a
>>> smaller number it works now!
>>> thanks you for the help. really appreciate it!
>>>
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Looks like your config is wrong, can you show your config code?

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>   at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>   at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>   at LocalHashWinAggWithKeys$292.open(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
> wrote:

Re: batch job OOM

2020-01-28 Thread Arvid Heise
Hi Fanbin,

you could use the RC1 of Flink that was created yesterday and use the
apache repo
https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/
.
Alternatively, if you build Flink locally with `mvn install`, then you
could use mavenLocal() in your gradle.build and feed from that.

Best,

Arvid

On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu  wrote:

> I can build flink 1.10 and install it on to EMR
> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
> continue to use 1.9.0 since there is no 1.10 available?
>
> Thanks,
> Fanbin
>
> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:
>
>> Hi Fanbin,
>>
>> You can install your own Flink build in AWS EMR, and it frees you from
>> Emr’s release cycles
>>
>> On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:
>>
>>> Fanbin,
>>>
>>> I have no idea now, can you created a JIRA to track it? You can describe
>>> complete SQL and some data informations.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Do you have any suggestions to debug the above mentioned
 IndexOutOfBoundsException error?
 Thanks,

 Fanbin

 On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
 wrote:

> I got the following error when running another job. any suggestions?
>
> Caused by: java.lang.IndexOutOfBoundsException
> at
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
> at
> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
> at HashWinAggWithKeys$538.endInput(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> I set the config value to be too large. After I changed it to a
>> smaller number it works now!
>> thanks you for the help. really appreciate it!
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Looks like your config is wrong, can you show your config code?
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Great, now i got a different error:

 java.lang.NullPointerException: Initial Segment may not be null
at 
 org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
at 
 org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
at LocalHashWinAggWithKeys$292.open(Unknown Source)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at 
 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


 is there any other config i should add?

 thanks,

 Fanbin


 On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
 wrote:

> you beat me to it.
> let's me try that.
>
> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>> Fanbin,
>>
>> Document is here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>> NOTE: 

Re: batch job OOM

2020-01-27 Thread Fanbin Bu
I can build flink 1.10 and install it on to EMR
(flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
continue to use 1.9.0 since there is no 1.10 available?

Thanks,
Fanbin

On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:

> Hi Fanbin,
>
> You can install your own Flink build in AWS EMR, and it frees you from
> Emr’s release cycles
>
> On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:
>
>> Fanbin,
>>
>> I have no idea now, can you created a JIRA to track it? You can describe
>> complete SQL and some data informations.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:
>>
>>> Jingsong,
>>>
>>> Do you have any suggestions to debug the above mentioned
>>> IndexOutOfBoundsException error?
>>> Thanks,
>>>
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>>> wrote:
>>>
 I got the following error when running another job. any suggestions?

 Caused by: java.lang.IndexOutOfBoundsException
 at
 org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
 at
 org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
 at HashWinAggWithKeys$538.endInput(Unknown Source)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)

 On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> I set the config value to be too large. After I changed it to a
> smaller number it works now!
> thanks you for the help. really appreciate it!
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Looks like your config is wrong, can you show your config code?
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Great, now i got a different error:
>>>
>>> java.lang.NullPointerException: Initial Segment may not be null
>>> at 
>>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>> at 
>>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>> at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>> at 
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> is there any other config i should add?
>>>
>>> thanks,
>>>
>>> Fanbin
>>>
>>>
>>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>>> wrote:
>>>
 you beat me to it.
 let's me try that.

 On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Document is here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
> NOTE: you need configure this into TableConfig.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now.
>> the second option is ruled out. but will keep that in mind for future
>> upgrade.
>>
>> I'm going to try the first option. It's probably a good idea to
>> add that in the doc for example:

Re: batch job OOM

2020-01-24 Thread Bowen Li
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from
Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:

> Fanbin,
>
> I have no idea now, can you created a JIRA to track it? You can describe
> complete SQL and some data informations.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Do you have any suggestions to debug the above mentioned
>> IndexOutOfBoundsException error?
>> Thanks,
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>> wrote:
>>
>>> I got the following error when running another job. any suggestions?
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException
>>> at
>>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>>> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 I set the config value to be too large. After I changed it to a smaller
 number it works now!
 thanks you for the help. really appreciate it!

 Fanbin

 On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> is there any other config i should add?
>>
>> thanks,
>>
>> Fanbin
>>
>>
>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>> wrote:
>>
>>> you beat me to it.
>>> let's me try that.
>>>
>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Document is here:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
 NOTE: you need configure this into TableConfig.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
 wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now.
> the second option is ruled out. but will keep that in mind for future
> upgrade.
>
> I'm going to try the first option. It's probably a good idea to
> add that in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash
>> aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still 

Re: batch job OOM

2020-01-23 Thread Jingsong Li
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe
complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:

> Jingsong,
>
> Do you have any suggestions to debug the above mentioned
> IndexOutOfBoundsException error?
> Thanks,
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu  wrote:
>
>> I got the following error when running another job. any suggestions?
>>
>> Caused by: java.lang.IndexOutOfBoundsException
>> at
>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu  wrote:
>>
>>> Jingsong,
>>>
>>> I set the config value to be too large. After I changed it to a smaller
>>> number it works now!
>>> thanks you for the help. really appreciate it!
>>>
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Looks like your config is wrong, can you show your config code?

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>   at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>   at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>   at LocalHashWinAggWithKeys$292.open(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
> wrote:
>
>> you beat me to it.
>> let's me try that.
>>
>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Document is here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>>> NOTE: you need configure this into TableConfig.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Thank you for the response.
 Since I'm using flink on EMR and the latest version is 1.9 now. the
 second option is ruled out. but will keep that in mind for future 
 upgrade.

 I'm going to try the first option. It's probably a good idea to add
 that in the doc for example:
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 Thanks,
 Fanbin

 On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
 wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash
> aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory
> configuration. So you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real
> operator memory, so operator can use more manage memory, so you don't 
> need
> configure 

Re: batch job OOM

2020-01-23 Thread Fanbin Bu
Jingsong,

Do you have any suggestions to debug the above mentioned
IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu  wrote:

> I got the following error when running another job. any suggestions?
>
> Caused by: java.lang.IndexOutOfBoundsException
> at
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
> at HashWinAggWithKeys$538.endInput(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> I set the config value to be too large. After I changed it to a smaller
>> number it works now!
>> thanks you for the help. really appreciate it!
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Looks like your config is wrong, can you show your config code?
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Great, now i got a different error:

 java.lang.NullPointerException: Initial Segment may not be null
at 
 org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
at 
 org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
at LocalHashWinAggWithKeys$292.open(Unknown Source)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


 is there any other config i should add?

 thanks,

 Fanbin


 On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
 wrote:

> you beat me to it.
> let's me try that.
>
> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Document is here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>> NOTE: you need configure this into TableConfig.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Thank you for the response.
>>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>>> second option is ruled out. but will keep that in mind for future 
>>> upgrade.
>>>
>>> I'm going to try the first option. It's probably a good idea to add
>>> that in the doc for example:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>
>>> Thanks,
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>>> wrote:
>>>
 Hi Fanbin,

 Thanks for using blink batch mode.

 The OOM is caused by the manage memory not enough in Hash
 aggregation.

 There are three options you can choose from:

 1.Is your version Flink 1.9? 1.9 still use fix memory
 configuration. So you need increase hash memory:
 - table.exec.resource.hash-agg.memory: 1024 mb

 2.In 1.10, we use slot manage memory to dynamic config real
 operator memory, so operator can use more manage memory, so you don't 
 need
 configure hash agg memory anymore. You can try 1.10 RC0 [1]

 3.We can use sort aggregation to avoid OOM too, but there is no
 config option now, I created JIRA to track it. [2]

 [1]
 

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu  wrote:

> Jingsong,
>
> I set the config value to be too large. After I changed it to a smaller
> number it works now!
> thanks you for the help. really appreciate it!
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Looks like your config is wrong, can you show your config code?
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Great, now i got a different error:
>>>
>>> java.lang.NullPointerException: Initial Segment may not be null
>>> at 
>>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>> at 
>>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>> at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> is there any other config i should add?
>>>
>>> thanks,
>>>
>>> Fanbin
>>>
>>>
>>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>>> wrote:
>>>
 you beat me to it.
 let's me try that.

 On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Document is here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
> NOTE: you need configure this into TableConfig.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>> second option is ruled out. but will keep that in mind for future 
>> upgrade.
>>
>> I'm going to try the first option. It's probably a good idea to add
>> that in the doc for example:
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>
>> Thanks,
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>> wrote:
>>
>>> Hi Fanbin,
>>>
>>> Thanks for using blink batch mode.
>>>
>>> The OOM is caused by the manage memory not enough in Hash
>>> aggregation.
>>>
>>> There are three options you can choose from:
>>>
>>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
>>> So you need increase hash memory:
>>> - table.exec.resource.hash-agg.memory: 1024 mb
>>>
>>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>>> memory, so operator can use more manage memory, so you don't need 
>>> configure
>>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>>
>>> 3.We can use sort aggregation to avoid OOM too, but there is no
>>> config option now, I created JIRA to track it. [2]
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
>>> wrote:
>>>

 tried to increase memory:
 flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong,

I set the config value to be too large. After I changed it to a smaller
number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li  wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> is there any other config i should add?
>>
>> thanks,
>>
>> Fanbin
>>
>>
>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu  wrote:
>>
>>> you beat me to it.
>>> let's me try that.
>>>
>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Document is here:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
 NOTE: you need configure this into TableConfig.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
 wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the
> second option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add
> that in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
>> So you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need 
>> configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no
>> config option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
>> wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' 
>>> minute), sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval 
>>> '1' minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have a batch job using blink planner. and got the following
 error. I was able to successfully run the same job with flink 1.8 on 
 yarn.

 I set conf as:
 taskmanager.heap.size: 5m

 and flink UI gives me
 Last Heartbeat:20-01-22
 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free 
 Slots
 / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
 GBFlink Managed Memory:24.9 GB

 any suggestions on how to move forward?
 Thanks,
 Fanbin

 Caused by: org.apache.flink.runtime.client.JobExecutionException:
 Job execution failed.
 at
 

Re: batch job OOM

2020-01-22 Thread Jingsong Li
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu  wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>   at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>   at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>   at LocalHashWinAggWithKeys$292.open(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu  wrote:
>
>> you beat me to it.
>> let's me try that.
>>
>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Document is here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>>> NOTE: you need configure this into TableConfig.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Thank you for the response.
 Since I'm using flink on EMR and the latest version is 1.9 now. the
 second option is ruled out. but will keep that in mind for future upgrade.

 I'm going to try the first option. It's probably a good idea to add
 that in the doc for example:
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 Thanks,
 Fanbin

 On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
 wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
> So you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real operator
> memory, so operator can use more manage memory, so you don't need 
> configure
> hash agg memory anymore. You can try 1.10 RC0 [1]
>
> 3.We can use sort aggregation to avoid OOM too, but there is no config
> option now, I created JIRA to track it. [2]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
> [2] https://issues.apache.org/jira/browse/FLINK-15732
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
> wrote:
>
>>
>> tried to increase memory:
>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>
>> and still got the same OOM exception.
>>
>> my sql is like:
>>
>> select id, hop_end(created_at, interval '30' second, interval '1' 
>> minute), sum(field)... #20 of these sums
>>
>> from table group by id, hop(created_at, interval '30' second, interval 
>> '1' minute)
>>
>>
>>
>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a batch job using blink planner. and got the following error.
>>> I was able to successfully run the same job with flink 1.8 on yarn.
>>>
>>> I set conf as:
>>> taskmanager.heap.size: 5m
>>>
>>> and flink UI gives me
>>> Last Heartbeat:20-01-22
>>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free 
>>> Slots
>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>> GBFlink Managed Memory:24.9 GB
>>>
>>> any suggestions on how to move forward?
>>> Thanks,
>>> Fanbin
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Job execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>> ... 25 more
>>>
>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>> at
>>> 

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
at 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
at 
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
at LocalHashWinAggWithKeys$292.open(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


is there any other config i should add?

thanks,

Fanbin


On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu  wrote:

> you beat me to it.
> let's me try that.
>
> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Document is here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>> NOTE: you need configure this into TableConfig.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Thank you for the response.
>>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>>> second option is ruled out. but will keep that in mind for future upgrade.
>>>
>>> I'm going to try the first option. It's probably a good idea to add that
>>> in the doc for example:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>
>>> Thanks,
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>>> wrote:
>>>
 Hi Fanbin,

 Thanks for using blink batch mode.

 The OOM is caused by the manage memory not enough in Hash aggregation.

 There are three options you can choose from:

 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
 you need increase hash memory:
 - table.exec.resource.hash-agg.memory: 1024 mb

 2.In 1.10, we use slot manage memory to dynamic config real operator
 memory, so operator can use more manage memory, so you don't need configure
 hash agg memory anymore. You can try 1.10 RC0 [1]

 3.We can use sort aggregation to avoid OOM too, but there is no config
 option now, I created JIRA to track it. [2]

 [1]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
 [2] https://issues.apache.org/jira/browse/FLINK-15732

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
 wrote:

>
> tried to increase memory:
> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>
> and still got the same OOM exception.
>
> my sql is like:
>
> select id, hop_end(created_at, interval '30' second, interval '1' 
> minute), sum(field)... #20 of these sums
>
> from table group by id, hop(created_at, interval '30' second, interval 
> '1' minute)
>
>
>
> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
> wrote:
>
>> Hi,
>>
>> I have a batch job using blink planner. and got the following error.
>> I was able to successfully run the same job with flink 1.8 on yarn.
>>
>> I set conf as:
>> taskmanager.heap.size: 5m
>>
>> and flink UI gives me
>> Last Heartbeat:20-01-22
>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free 
>> Slots
>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>> GBFlink Managed Memory:24.9 GB
>>
>> any suggestions on how to move forward?
>> Thanks,
>> Fanbin
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 25 more
>>
>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>> HashWinAggWithKeys$534.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>> at
>> 

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li  wrote:

> Fanbin,
>
> Document is here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
> NOTE: you need configure this into TableConfig.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>> second option is ruled out. but will keep that in mind for future upgrade.
>>
>> I'm going to try the first option. It's probably a good idea to add that
>> in the doc for example:
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>
>> Thanks,
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>> wrote:
>>
>>> Hi Fanbin,
>>>
>>> Thanks for using blink batch mode.
>>>
>>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>>
>>> There are three options you can choose from:
>>>
>>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>>> you need increase hash memory:
>>> - table.exec.resource.hash-agg.memory: 1024 mb
>>>
>>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>>> memory, so operator can use more manage memory, so you don't need configure
>>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>>
>>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>>> option now, I created JIRA to track it. [2]
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu 
>>> wrote:
>>>

 tried to increase memory:
 flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar

 and still got the same OOM exception.

 my sql is like:

 select id, hop_end(created_at, interval '30' second, interval '1' minute), 
 sum(field)... #20 of these sums

 from table group by id, hop(created_at, interval '30' second, interval '1' 
 minute)



 On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
 wrote:

> Hi,
>
> I have a batch job using blink planner. and got the following error. I
> was able to successfully run the same job with flink 1.8 on yarn.
>
> I set conf as:
> taskmanager.heap.size: 5m
>
> and flink UI gives me
> Last Heartbeat:20-01-22
> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
> GBFlink Managed Memory:24.9 GB
>
> any suggestions on how to move forward?
> Thanks,
> Fanbin
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 25 more
>
> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
> HashWinAggWithKeys$534.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>

>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I saw the doc in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html
.
Do i have to set that in the code or can i do it through flink-conf.yaml?

On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu  wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the second
> option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add that
> in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>> you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>> option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>>> sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>>> minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have a batch job using blink planner. and got the following error. I
 was able to successfully run the same job with flink 1.8 on yarn.

 I set conf as:
 taskmanager.heap.size: 5m

 and flink UI gives me
 Last Heartbeat:20-01-22
 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
 / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
 GBFlink Managed Memory:24.9 GB

 any suggestions on how to move forward?
 Thanks,
 Fanbin

 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
 execution failed.
 at
 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at
 org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
 ... 25 more

 *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
 HashWinAggWithKeys$534.processElement(Unknown Source)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: batch job OOM

2020-01-22 Thread Jingsong Li
Fanbin,

Document is here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu  wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the second
> option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add that
> in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>> you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>> option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>>> sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>>> minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have a batch job using blink planner. and got the following error. I
 was able to successfully run the same job with flink 1.8 on yarn.

 I set conf as:
 taskmanager.heap.size: 5m

 and flink UI gives me
 Last Heartbeat:20-01-22
 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
 / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
 GBFlink Managed Memory:24.9 GB

 any suggestions on how to move forward?
 Thanks,
 Fanbin

 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
 execution failed.
 at
 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at
 org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
 ... 25 more

 *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
 HashWinAggWithKeys$534.processElement(Unknown Source)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second
option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in
the doc for example:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li  wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
> you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real operator
> memory, so operator can use more manage memory, so you don't need configure
> hash agg memory anymore. You can try 1.10 RC0 [1]
>
> 3.We can use sort aggregation to avoid OOM too, but there is no config
> option now, I created JIRA to track it. [2]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
> [2] https://issues.apache.org/jira/browse/FLINK-15732
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:
>
>>
>> tried to increase memory:
>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>>
>> and still got the same OOM exception.
>>
>> my sql is like:
>>
>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>> sum(field)... #20 of these sums
>>
>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>> minute)
>>
>>
>>
>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu  wrote:
>>
>>> Hi,
>>>
>>> I have a batch job using blink planner. and got the following error. I
>>> was able to successfully run the same job with flink 1.8 on yarn.
>>>
>>> I set conf as:
>>> taskmanager.heap.size: 5m
>>>
>>> and flink UI gives me
>>> Last Heartbeat:20-01-22
>>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>> GBFlink Managed Memory:24.9 GB
>>>
>>> any suggestions on how to move forward?
>>> Thanks,
>>> Fanbin
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>> ... 25 more
>>>
>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: batch job OOM

2020-01-22 Thread Jingsong Li
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you
need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator
memory, so operator can use more manage memory, so you don't need configure
hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config
option now, I created JIRA to track it. [2]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
[2] https://issues.apache.org/jira/browse/FLINK-15732

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu  wrote:

>
> tried to increase memory:
> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar
>
> and still got the same OOM exception.
>
> my sql is like:
>
> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
> sum(field)... #20 of these sums
>
> from table group by id, hop(created_at, interval '30' second, interval '1' 
> minute)
>
>
>
> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu  wrote:
>
>> Hi,
>>
>> I have a batch job using blink planner. and got the following error. I
>> was able to successfully run the same job with flink 1.8 on yarn.
>>
>> I set conf as:
>> taskmanager.heap.size: 5m
>>
>> and flink UI gives me
>> Last Heartbeat:20-01-22
>> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>> GBFlink Managed Memory:24.9 GB
>>
>> any suggestions on how to move forward?
>> Thanks,
>> Fanbin
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 25 more
>>
>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>> HashWinAggWithKeys$534.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>

-- 
Best, Jingsong Lee


Re: batch job OOM

2020-01-22 Thread Fanbin Bu
tried to increase memory:
flink run  -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar

and still got the same OOM exception.

my sql is like:

select id, hop_end(created_at, interval '30' second, interval '1'
minute), sum(field)... #20 of these sums

from table group by id, hop(created_at, interval '30' second, interval
'1' minute)



On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu  wrote:

> Hi,
>
> I have a batch job using blink planner. and got the following error. I was
> able to successfully run the same job with flink 1.8 on yarn.
>
> I set conf as:
> taskmanager.heap.size: 5m
>
> and flink UI gives me
> Last Heartbeat:20-01-22
> 14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
> GBFlink Managed Memory:24.9 GB
>
> any suggestions on how to move forward?
> Thanks,
> Fanbin
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 25 more
>
> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
> HashWinAggWithKeys$534.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>


batch job OOM

2020-01-22 Thread Fanbin Bu
Hi,

I have a batch job using blink planner. and got the following error. I was
able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 5m

and flink UI gives me
Last Heartbeat:20-01-22
14:56:25ID:container_1579720108062_0018_01_20Data Port:41029Free Slots
/ All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more

*Caused by: java.io.IOException: Hash window aggregate map OOM.* at
HashWinAggWithKeys$534.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)