Hey Timur,

if you are using EMR with IAM roles, Flink should work out of the box.
You don't need to change the Hadoop config and the IAM role takes care
of setting up all credentials at runtime. You don't need to hardcode
any keys in your application that way and this is the recommended way
to go in order to not worry about securely exchanging the keys and
then keeping them secure afterwards.

With EMR 4.4.0 you have to use a Flink binary version built against
Hadoop 2.7. Did you do that? Can you please retry with an
out-of-the-box Flink and just run it like this:

HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.

Hope this helps! Please report back. :-)

– Ufuk


On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> Hello Ufuk,
>
> I'm using EMR 4.4.0.
>
> Thanks,
> Timur
>
> On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Timur,
>>
>> which EMR version are you using?
>>
>> – Ufuk
>>
>> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <timur.fairu...@gmail.com>
>> wrote:
>> > Thanks for the answer, Ken.
>> >
>> > My understanding is that file system selection is driven by the
>> > following
>> > sections in core-site.xml:
>> > <property>
>> >   <name>fs.s3.impl</name>
>> >   <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!--
>> > This
>> > was the original value -->
>> >   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
>> > </property>
>> >
>> > <property>
>> >   <name>fs.s3n.impl</name>
>> >   <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
>> > </property>
>> >
>> > If I run the program using configuration above with s3n (and also
>> > modifying
>> > credential keys to use s3n) it fails with the same error, but there is
>> > no
>> > "... opening key ..." logs. S3a seems to be not supported, it fails with
>> > the
>> > following:
>> > Caused by: java.io.IOException: No file system found with scheme s3a,
>> > referenced in file URI 's3a://<my key>'.
>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> > at
>> >
>> > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> > ... 23 more
>> >
>> > I am puzzled by the fact that EMRFS is still apparently referenced
>> > somewhere
>> > as an implementation for S3 protocol, I'm not able to locate where this
>> > configuration is set.
>> >
>> >
>> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
>> > <kkrugler_li...@transpac.com>
>> > wrote:
>> >>
>> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
>> >> s3.
>> >>
>> >> Though EMR has some support for magically treating the s3 protocol as
>> >> s3n
>> >> (or maybe s3a now, with Hadoop 2.6 or later)
>> >>
>> >> What happens if you use s3n://<key info>/<path to file> for the --input
>> >> parameter?
>> >>
>> >> — Ken
>> >>
>> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <timur.fairu...@gmail.com>
>> >> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
>> >> succeeded
>> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
>> >> Flink
>> >> Job, unload outputs from HDFS to S3.
>> >>
>> >> However, ideally I'd like to read/write data directly from/to S3. I
>> >> followed the instructions here:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> >> more specifically I:
>> >>   1. Modified flink-conf to point to /etc/hadoop/conf
>> >>   2. Modified core-site.xml per link above (not clear why why it is not
>> >> using IAM, I had to provide AWS keys explicitly).
>> >>
>> >> Run the following command:
>> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
>> >> yarn-cluster
>> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
>> >> --input
>> >> s3://<my key> --output hdfs:///flink-output
>> >>
>> >> First, I see messages like that:
>> >> 2016-04-04 21:37:10,418 INFO
>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening
>> >> key
>> >> '<my key>' for reading at position '333000'
>> >>
>> >> Then, it fails with the following error:
>> >>
>> >> ------------------------------------------------------------
>> >>
>> >>  The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The program
>> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> >> (WordCount Example)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>> >>
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>
>> >> at
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>
>> >> at java.lang.reflect.Method.invoke(Method.java:606)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>
>> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>
>> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>
>> >> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> >> Failed
>> >> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>> >>
>> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> >>
>> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> >>
>> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>> >>
>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >>
>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >>
>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >>
>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >>
>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >>
>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >> Caused by: org.apache.flink.runtime.JobException: Creating the input
>> >> splits caused an error:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>> >>
>> >> ... 21 more
>> >>
>> >> Caused by: java.lang.NoSuchMethodError:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>> >>
>> >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>> >>
>> >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> >>
>> >> ... 23 more
>> >>
>> >>
>> >> Somehow, it's still tries to use EMRFS (which may be a valid thing?),
>> >> but
>> >> it is failing to initialize. I don't know enough about EMRFS/S3 interop
>> >> so
>> >> don't know how diagnose it further.
>> >>
>> >> I run Flink 1.0.0 compiled for Scala 2.11.
>> >>
>> >> Any advice on how to make it work is highly appreciated.
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Timur
>> >>
>> >>
>> >>
>> >>
>> >> --------------------------
>> >> Ken Krugler
>> >> +1 530-210-6378
>> >> http://www.scaleunlimited.com
>> >> custom big data solutions & training
>> >> Hadoop, Cascading, Cassandra & Solr
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>
>

Reply via email to