Thanks, here’s the debug output. It looks like we need to setup hdfs-config
file in the flink config.
Could you advise us further?
--
2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
- Loading extension file systems via services
2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
- Added file system
maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
- Cannot find hdfs-default configuration-file path in Flink config.
2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
- Cannot find hdfs-site configuration-file path in Flink config.
From: Aaron Langford <[email protected]>
Date: Thursday, January 23, 2020 at 12:22 PM
To: Senthil Kumar <[email protected]>
Cc: Yang Wang <[email protected]>, "[email protected]"
<[email protected]>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
When creating your cluster, you can provide configurations that EMR will find
the right home for. Example for the aws cli:
aws emr create-cluster ... --configurations '[{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}]'
If you can't take down your existing EMR cluster for some reason, you can ask
AWS to modify these configurations for you on the cluster. They should take
effect when you start a new Flink job (new job manager as well as a new job in
that job manager). It is my understanding that configuration changes require a
restart of a flink jobmanager + topology in order to take effect. Here's an
example of how to modify an existing cluster (I just threw this together, so
beware malformed JSON):
aws emr modify-instance-groups --cli-input-json '{
"ClusterId": "<your cluster id>",
"InstanceGroups": [{
"InstanceGroupId": "<master instance group id>",
"Configurations": [{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}]
},{
"InstanceGroupId": "<core instance group id>",
"Configurations": [{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}]
}]
}'
On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar
<[email protected]<mailto:[email protected]>> wrote:
Could you tell us how to turn on debug level logs?
We attempted this (on driver)
sudo stop hadoop-yarn-resourcemanager
followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F27853974%2Fhow-to-set-debug-log-level-for-resourcemanager&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690780390&sdata=VqB7Aeb7dNJSFBgePjKeHzigxdBSzPykFZ4YqFexb1I%3D&reserved=0>
and
sudo start hadoop-yarn-resourcemanager
but we still don’t see any debug level logs
Any further info is much appreciated!
From: Aaron Langford
<[email protected]<mailto:[email protected]>>
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar <[email protected]<mailto:[email protected]>>
Cc: Yang Wang <[email protected]<mailto:[email protected]>>,
"[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
Senthil,
One of the key steps in debugging this for me was enabling debug level logs on
my cluster, and then looking at the logs in the resource manager. The failure
you are after happens before the exceptions you have reported here. When your
Flink application is starting, it will attempt to load various file system
implementations. You can see which ones it successfully loaded when you have
the debug level of logs configured. You will have to do some digging, but this
is a good place to start. Try to discover if your application is indeed loading
the s3 file system, or if that is not happening. You should be able to find the
file system implementations that were loaded by searching for the string "Added
file system".
Also, do you mind sharing the bootstrap action script that you are using to get
the s3 file system in place?
Aaron
On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar
<[email protected]<mailto:[email protected]>> wrote:
Yang, I appreciate your help! Please let me know if I can provide with any
other info.
I resubmitted my executable jar file as a step to the flink EMR and here’s are
all the exceptions. I see two of them.
I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
2020-01-21 16:31:37,587 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File
Source -> Sink: Unnamed (11/16)): Error during di
sposal of stream operator.
java.lang.NullPointerException
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split
Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File
Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from
RUNNING to FAILED.
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only
supported for HDFS and for Hadoop version 2.7 or newer
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
From: Yang Wang <[email protected]<mailto:[email protected]>>
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar <[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by
`S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.
Could you share the debug level jobmanager/taskmanger logs so that we could
confirm whether the
classpath and FileSystem are loaded correctly.
Best,
Yang
Senthil Kumar <[email protected]<mailto:[email protected]>> 于2020年1月17日周五
下午10:57写道:
Hello all,
Newbie here!
We are running in Amazon EMR with the following installed in the EMR Software
Configuration
Hadoop 2.8.5
JupyterHub 1.0.0
Ganglia 3.7.2
Hive 2.3.6
Flink 1.9.0
I am trying to get a Streaming job from one S3 bucket into an another S3 bucket
using the StreamingFileSink
I got the infamous exception:
Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
/usr/lib/flink/lib
https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=FUU5VqjDZlt%2FbgFVpZzLzMmMwJ%2B2lIeltIC32QOuUnw%3D&reserved=0>
That did not work.
Further googling, revealed for Flink 1.9.0 and above: (according to this)
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690790386&sdata=9%2B2kql06euH5FC0rRjJpwnDqQYoiNVyUHXtOW%2FQzm%2FM%3D&reserved=0>
it seems that I need to install the jar file in the plugins directory
(/usr/lib/flink/plugins/s3-fs-hadoop)
That did not work either.
At this point, I am not sure what to do and would appreciate some help!
Cheers
Kumar