Thanks for the answer, Ken.

My understanding is that file system selection is driven by the following
sections in core-site.xml:
<property>
  <name>fs.s3.impl</name>
  <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!-- This
was the original value -->
  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>

<property>
  <name>fs.s3n.impl</name>
  <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
</property>

If I run the program using configuration above with s3n (and also modifying
credential keys to use s3n) it fails with the same error, but there is no
"... opening key ..." logs. S3a seems to be not supported, it fails with
the following:
Caused by: java.io.IOException: No file system found with scheme s3a,
referenced in file URI 's3a://<my key>'.
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
... 23 more

I am puzzled by the fact that EMRFS is still apparently referenced
somewhere as an implementation for S3 protocol, I'm not able to locate
where this configuration is set.


On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.
>
> Though EMR has some support for magically treating the s3 protocol as s3n
> (or maybe s3a now, with Hadoop 2.6 or later)
>
> What happens if you use s3n://<key info>/<path to file> for the --input
> parameter?
>
> — Ken
>
> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>
> Hello,
>
> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
> Job, unload outputs from HDFS to S3.
>
> However, ideally I'd like to read/write data directly from/to S3. I
> followed the instructions here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
> more specifically I:
>   1. Modified flink-conf to point to /etc/hadoop/conf
>   2. Modified core-site.xml per link above (not clear why why it is not
> using IAM, I had to provide AWS keys explicitly).
>
> Run the following command:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> s3://<my key> --output hdfs:///flink-output
>
> First, I see messages like that:
> 2016-04-04 21:37:10,418 INFO
> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening key
> '<my key>' for reading at position '333000'
>
> Then, it fails with the following error:
>
> ------------------------------------------------------------
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> (WordCount Example)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error:
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>
> ... 21 more
>
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>
> at
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>
> ... 23 more
>
>
> Somehow, it's still tries to use EMRFS (which may be a valid thing?), but
> it is failing to initialize. I don't know enough about EMRFS/S3 interop so
> don't know how diagnose it further.
>
> I run Flink 1.0.0 compiled for Scala 2.11.
>
> Any advice on how to make it work is highly appreciated.
>
>
> Thanks,
>
> Timur
>
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
>

Reply via email to