Senthil,

One of the key steps in debugging this for me was enabling debug level logs
on my cluster, and then looking at the logs in the resource manager. The
failure you are after happens before the exceptions you have reported here.
When your Flink application is starting, it will attempt to load various
file system implementations. You can see which ones it successfully loaded
when you have the debug level of logs configured. You will have to do some
digging, but this is a good place to start. Try to discover if your
application is indeed loading the s3 file system, or if that is not
happening. You should be able to find the file system implementations that
were loaded by searching for the string "Added file system".

Also, do you mind sharing the bootstrap action script that you are using to
get the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar <senthi...@vmware.com> wrote:

> Yang, I appreciate your help! Please let me know if I can provide with any
> other info.
>
>
>
> I resubmitted my executable jar file as a step to the flink EMR and here’s
> are all the  exceptions. I see two of them.
>
>
>
> I fished them out of /var/log/Hadoop/<STEP-ID>/syslog
>
>
>
> 2020-01-21 16:31:37,587 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
> File Source -> Sink: Unnamed (11/16)): Error during di
>
> sposal of stream operator.
>
> java.lang.NullPointerException
>
>         at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
> Custom File Source -> Sink: Unnamed (8/16)
> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or newer
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>
>         at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>
>         at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>
>         at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> *From: *Yang Wang <danrtsey...@gmail.com>
> *Date: *Saturday, January 18, 2020 at 7:58 PM
> *To: *Senthil Kumar <senthi...@vmware.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> I think this exception is not because the hadoop version isn't high
> enough.
>
> It seems that the "s3" URI scheme could not be recognized by
> `S3FileSystemFactory`. So it fallbacks to
>
> the `HadoopFsFactory`.
>
>
>
> Could you share the debug level jobmanager/taskmanger logs so that we
> could confirm whether the
>
> classpath and FileSystem are loaded correctly.
>
>
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Senthil Kumar <senthi...@vmware.com> 于2020年1月17日周五 下午10:57写道:
>
> Hello all,
>
>
>
> Newbie here!
>
>
>
> We are running in Amazon EMR with the following installed in the EMR
> Software Configuration
>
> Hadoop 2.8.5
>
> JupyterHub 1.0.0
>
> Ganglia 3.7.2
>
> Hive 2.3.6
>
> Flink 1.9.0
>
>
>
> I am trying to get a Streaming job from one S3 bucket into an another S3
> bucket using the StreamingFileSink
>
>
>
> I got the infamous exception:
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>
>
>
> According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in
> /usr/lib/flink/lib
>
>
> https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov&data=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538&sdata=bHwqfmuyzvc8DZuLRs4FZ4Cil%2Fbd7yaIEerD%2FTKe5eo%3D&reserved=0>
>
>
>
> That did not work.
>
>
>
> Further googling, revealed for Flink 1.9.0 and above:  (according to this)
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Ffilesystems%2F&data=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538&sdata=CrsF6vY%2BmSMwqZF9%2FGSiRKdrESYITF7OEYpNMN%2BdI94%3D&reserved=0>
>
>
>
> it seems that I need to install the jar file in the plugins directory
> (/usr/lib/flink/plugins/s3-fs-hadoop)
>
>
>
> That did not work either.
>
>
>
> At this point, I am not sure what to do and would appreciate some help!
>
>
>
> Cheers
>
> Kumar
>
>
>
>

Reply via email to