Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-24 Thread Senthil Kumar
Thanks, here’s the debug output. It looks like we need to setup hdfs-config 
file in the flink config.
Could you advise us further?

--


2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem   
- Loading extension file systems via services

2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem   
- Added file system 
maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory

2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils 
- Cannot find hdfs-default configuration-file path in Flink config.

2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils 
- Cannot find hdfs-site configuration-file path in Flink config.


From: Aaron Langford 
Date: Thursday, January 23, 2020 at 12:22 PM
To: Senthil Kumar 
Cc: Yang Wang , "user@flink.apache.org" 

Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

When creating your cluster, you can provide configurations that EMR will find 
the right home for. Example for the aws cli:

aws emr create-cluster ... --configurations '[{
"Classification": "flink-log4j",
"Properties": {
  "log4j.rootLogger": "DEBUG,file"
}
  },{
"Classification": "flink-log4j-yarn-session",
"Properties": {
  "log4j.rootLogger": "DEBUG,stdout"
  }]'

If you can't take down your existing EMR cluster for some reason, you can ask 
AWS to modify these configurations for you on the cluster. They should take 
effect when you start a new Flink job (new job manager as well as a new job in 
that job manager). It is my understanding that configuration changes require a 
restart of a flink jobmanager + topology in order to take effect. Here's an 
example of how to modify an existing cluster (I just threw this together, so 
beware malformed JSON):

aws emr modify-instance-groups --cli-input-json '{
"ClusterId": "",
"InstanceGroups": [{
"InstanceGroupId": "",
"Configurations": [{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}]
},{
"InstanceGroupId": "",
"Configurations": [{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}]
 }]
}'

On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Could you tell us how to turn on debug level logs?

We attempted this (on driver)

sudo stop hadoop-yarn-resourcemanager

followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager

and

sudo start hadoop-yarn-resourcemanager

but we still don’t see any debug level logs

Any further info is much appreciated!


From: Aaron Langford 
mailto:aaron.langfor...@gmail.com>>
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar mailto:senthi...@vmware.com>>
Cc: Yang Wang mailto:danrtsey...@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Senthil,

One of the key steps in debugging this for me was enabling debug level logs on 
my cluster, and then looking at the logs in the resource manager. The failure 
you are after happens before the exceptions you have reported here. When your 
Flink application is starting, it will attempt to load various file system 
implementations. You can see which ones it successfully loaded when you have 
the debug level of logs configured. You will have to do some digging, but this 
is a good place to start. Try to discover if your application is indeed loading 
the s3 file system, or if that is not happening. You should be able to find the 
file system implementations that were loaded by searching for the string "Added 
file system".

Also, do you mind sharing the bootstrap action script that you are using to get 
the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Yang, I appreciate your help! Please let me know if I can provide with any 
other info.

I resubmitted my executable jar file as a step to the flink EMR and h

Re: FileStreamingSink is using the same counter for different files

2020-01-24 Thread Pawel Bartoszek
I have looked into the source code and it looks likes that the same counter
counter value being used in two buckets is correct.
Each Bucket class
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
is
passed partCounter in the constructor. Whenever part file is rolled over
then counter is incremented within the scope of this bucket. It can happen
that there are two or more active buckets and counter is increased
independently inside them so that they are become equal. However,
global max counter maintained by Bucket*s *class always keeps the max part
counter so that when new bucket is created is passed the correct part
counter.

I have done my analysis based on the logs from my job. I highlighted the
same counter value used for part-0-8.

2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-6" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=7.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z due to element
2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-7" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=8.
2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z due to element
2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "*part-0-8*" for bucket
id=2020-01-24T14_55_00Z.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9).
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z on checkpoint.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z due to element
2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-8" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=9.
2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z due to element
2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-9" for bucket
id=2020-01-24T14_55_00Z.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Sub

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-24 Thread Aaron Langford
This seems to confirm that the S3 file system implementation is not being
loaded when you start your job.

Can you share the details of how you are getting the
flink-s3-fs-hadoop artifact
onto your cluster? Are you simply ssh-ing to the master node and doing this
manually? Are you doing this via a bootstrap action? Timing of this
action would be relevant as well.

Aaron

On Fri, Jan 24, 2020 at 8:12 AM Senthil Kumar  wrote:

> Thanks, here’s the debug output. It looks like we need to setup
> hdfs-config file in the flink config.
>
> Could you advise us further?
>
>
>
> --
>
>
>
> 2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
>   - Loading extension file systems via services
>
> 2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
>   - Added file system
> maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
>
> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>   - Cannot find hdfs-default configuration-file path in
> Flink config.
>
> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>   - Cannot find hdfs-site configuration-file path in
> Flink config.
>
>
>
>
>
> *From: *Aaron Langford 
> *Date: *Thursday, January 23, 2020 at 12:22 PM
> *To: *Senthil Kumar 
> *Cc: *Yang Wang , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> When creating your cluster, you can provide configurations that EMR will
> find the right home for. Example for the aws cli:
>
>
>
> aws emr create-cluster ... --configurations '[{
> "Classification": "flink-log4j",
> "Properties": {
>   "log4j.rootLogger": "DEBUG,file"
> }
>   },{
> "Classification": "flink-log4j-yarn-session",
> "Properties": {
>   "log4j.rootLogger": "DEBUG,stdout"
>   }]'
>
>
>
> If you can't take down your existing EMR cluster for some reason, you can
> ask AWS to modify these configurations for you on the cluster. They should
> take effect when you start a new Flink job (new job manager as well as a
> new job in that job manager). It is my understanding that configuration
> changes require a restart of a flink jobmanager + topology in order to take
> effect. Here's an example of how to modify an existing cluster (I just
> threw this together, so beware malformed JSON):
>
>
>
> aws emr modify-instance-groups --cli-input-json '{
> "ClusterId": "",
> "InstanceGroups": [{
> "InstanceGroupId": "",
> "Configurations": [{
> "Classification": "flink-log4j",
> "Properties": {
> "log4j.rootLogger": "DEBUG,file"
> }
> },{
> "Classification": "flink-log4j-yarn-session",
> "Properties": {
> "log4j.rootLogger": "DEBUG,stdout"
> }
> }]
> },{
> "InstanceGroupId": "",
> "Configurations": [{
> "Classification": "flink-log4j",
> "Properties": {
> "log4j.rootLogger": "DEBUG,file"
> }
> },{
> "Classification": "flink-log4j-yarn-session",
> "Properties": {
> "log4j.rootLogger": "DEBUG,stdout"
> }
> }]
>  }]
> }'
>
>
>
> On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar 
> wrote:
>
> Could you tell us how to turn on debug level logs?
>
>
>
> We attempted this (on driver)
>
>
>
> sudo stop hadoop-yarn-resourcemanager
>
>
>
> followed the instructions here
>
>
> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager
> 
>
>
>
> and
>
>
>
> sudo start hadoop-yarn-resourcemanager
>
>
>
> but we still don’t see any debug level logs
>
>
>
> Any further info is much appreciated!
>
>
>
>
>
> *From: *Aaron Langford 
> *Date: *Tuesday, January 21, 2020 at 10:54 AM
> *To: *Senthil Kumar 
> *Cc: *Yang Wang , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> Senthil,
>
>
>
> One of the key steps in debugging this for me was enabling debug level
> logs on my cluster, and then looking at the logs in the resource manager.
> The failure you are after happens before the exceptions you have reported
> here. When your Flink application is starting, it will attempt to load
> various file system implementations. You can see which ones it successfully
> loaded when you have the debug level of logs configured. You will have to
> do some digging, but this is a good place to start. Try to discover if your

Is there anything strictly special about sink functions?

2020-01-24 Thread Andrew Roberts
Hello,

I’m trying to push some behavior that we’ve currently got in a large, stateful 
SinkFunction implementation into Flink’s windowing system. The task at hand is 
similar to what StreamingFileSink provides, but more flexible. I don’t want to 
re-implement that sink, because it uses the 
StreamingRuntimeContext.getProcessingTimeService() via a cast - that class is 
marked as internal, and I’d like to avoid the exposure to an interface that 
could change. Extending it similarly introduces complexity I would rather not 
add to our codebase.

WindowedStream.process() provides more or less the pieces I need, but the 
stream continues on after a ProcessFunction - there’s no way to process() 
directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window], and 
follow that immediately with a no-op sink that discards the Unit values, or I 
could just leave the stream “unfinished," with no sink.

Is there a downside to either of these approaches? Is there anything special 
about doing sink-like work in a ProcessFunction or FlatMapFunction instead of a 
SinkFunction?

Thanks,

Andrew



-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


1.9.2 Release Date?

2020-01-24 Thread Hailu, Andreas
Hi,

Do we have any thoughts on a release date for 1.9.2? I've been eyeing 
FLINK-13184 particularly to 
help alleviate stress on our RM + Name Node and reduce noise/delays due to 
sporadic Task Manager timeouts. We submit thousands of jobs per hour, so this 
looks like it could be a big help.

Best,
Andreas Hailu

The Goldman Sachs Group, Inc. All rights reserved.
See http://www.gs.com/disclaimer/global_email for important risk disclosures, 
conflicts of interest and other terms and conditions relating to this e-mail 
and your reliance on information contained in it.  This message may contain 
confidential or privileged information.  If you are not the intended recipient, 
please advise us immediately and delete this message.  See 
http://www.gs.com/disclaimer/email for further information on confidentiality 
and the risks of non-secure electronic communication.  If you cannot access 
these links, please notify us by reply message and we will send the contents to 
you.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: FileStreamingSink is using the same counter for different files

2020-01-24 Thread Kostas Kloudas
Hi Pawel,

You are correct that counters are unique within the same bucket but
NOT across buckets. Across buckets, you may see the same counter being
used.
The max counter is used only upon restoring from a failure, resuming
from a savepoint or rescaling and this is done to guarantee that n
valid data are overwritten while limiting the state that Flink has to
keep internally. For a more detailed discussion about the why, you can
have a look here: https://issues.apache.org/jira/browse/FLINK-13609

Cheers,
Kostas

On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
 wrote:
>
> I have looked into the source code and it looks likes that the same counter 
> counter value being used in two buckets is correct.
> Each Bucket class 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
>  is passed partCounter in the constructor. Whenever part file is rolled over 
> then counter is incremented within the scope of this bucket. It can happen 
> that there are two or more active buckets and counter is increased 
> independently inside them so that they are become equal. However, global max 
> counter maintained by Buckets class always keeps the max part counter so that 
> when new bucket is created is passed the correct part counter.
>
> I have done my analysis based on the logs from my job. I highlighted the same 
> counter value used for part-0-8.
>
> 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=7.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=8 (max part counter=7).
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
> element
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=8.
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
> element
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=9 (max part counter=9).
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z on 
> checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
> element
> 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening 

Re: 1.9.2 Release Date?

2020-01-24 Thread Arvid Heise
Hi Andreas,

voting for 1.9.2-rc1 started 9h before you wrote your email. [1]
If noone finds a bug or raises other concerns, 1.9.2 should be available
next week.
We are always happy about feedback. So if you have the option to test that
rc1, please do.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-9-2-release-candidate-1-td36943.html

On Fri, Jan 24, 2020 at 8:22 PM Hailu, Andreas  wrote:

> Hi,
>
>
>
> Do we have any thoughts on a release date for 1.9.2? I’ve been eyeing
> FLINK-13184 
> particularly to help alleviate stress on our RM + Name Node and reduce
> noise/delays due to sporadic Task Manager timeouts. We submit thousands of
> jobs per hour, so this looks like it could be a big help.
>
>
>
> Best,
>
> Andreas Hailu
>
>
>
> *The Goldman Sachs Group, Inc. All rights reserved*.
>
> See http://www.gs.com/disclaimer/global_email for important risk
> disclosures, conflicts of interest and other terms and conditions relating
> to this e-mail and your reliance on information contained in it.  This
> message may contain confidential or privileged information.  If you are not
> the intended recipient, please advise us immediately and delete this
> message.  See http://www.gs.com/disclaimer/email for further information
> on confidentiality and the risks of non-secure electronic communication.
> If you cannot access these links, please notify us by reply message and we
> will send the contents to you.
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Re: batch job OOM

2020-01-24 Thread Bowen Li
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from
Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:

> Fanbin,
>
> I have no idea now, can you created a JIRA to track it? You can describe
> complete SQL and some data informations.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Do you have any suggestions to debug the above mentioned
>> IndexOutOfBoundsException error?
>> Thanks,
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>> wrote:
>>
>>> I got the following error when running another job. any suggestions?
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException
>>> at
>>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>>> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 I set the config value to be too large. After I changed it to a smaller
 number it works now!
 thanks you for the help. really appreciate it!

 Fanbin

 On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> is there any other config i should add?
>>
>> thanks,
>>
>> Fanbin
>>
>>
>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>> wrote:
>>
>>> you beat me to it.
>>> let's me try that.
>>>
>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Document is here:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
 NOTE: you need configure this into TableConfig.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
 wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now.
> the second option is ruled out. but will keep that in mind for future
> upgrade.
>
> I'm going to try the first option. It's probably a good idea to
> add that in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash
>> aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still