Hi Andy,

this looks to me indeed like a dependency problem. I assume that EMR or
something else is pulling in an incompatible version of Hadoop.

The classpath you've posted, is this the one logged in the log files
(TaskManager log) or did you compile it yourself? In the latter case, it
would also be helpful to get access to the TaskManager logs.

Cheers,
Till

On Mon, Oct 2, 2017 at 10:20 PM, Andy M. <ajm2...@gmail.com> wrote:

> Hi Fabian,
>
> 1) I have looked at the linked docs, and from what I can tell no setup
> should really need to be done to get Flink working(Other than downloading
> the correct binaries, which I believe I did)
> 2) I have downloaded the Flink 1.3.2 binaries(flink-1.3.2-bin-
> hadoop27-scala_2.11.tgz
> <http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-
> hadoop27-scala_2.11.tgz>)
> This is for hadoop 2.7.X, which matches EMR 5.8.0.
>
> I appreciate any help or guidance you can provide me in fixing my problems,
> please let me know if there is anything else I can provide you.
>
> Thank you
>
> On Mon, Oct 2, 2017 at 4:12 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi Andy,
> >
> > I'm not an AWS expert, so I'll just check on some common issues.
> >
> > I guess you already had a look at the Flink docs for AWS/EMR but I'll
> post
> > the link just be to sure [1].
> >
> > Since you are using Flink 1.3.2 (EMR 5.8.0 comes with Flink 1.3.1) did
> you
> > built Flink yourself or did you download the binaries?
> > Does the Hadoop version of the Flink build match the Hadoop version of
> EMR
> > 5.8.0, i.e., Hadoop 2.7.x?
> >
> > Best, Fabian
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/aws.html
> >
> > 2017-10-02 21:51 GMT+02:00 Andy M. <ajm2...@gmail.com>:
> >
> > > Hi Fabian,
> > >
> > > Sorry, I just realized I forgot to include that part.  The error
> returned
> > > is:
> > >
> > > java.lang.NoSuchMethodError:
> > > org.apache.hadoop.conf.Configuration.addResource(
> > Lorg/apache/hadoop/conf/
> > > Configuration;)V
> > >     at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(
> > > EmrFileSystem.java:93)
> > >     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.
> > > initialize(HadoopFileSystem.java:328)
> > >     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > > FileSystem.java:350)
> > >     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> > >     at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> > >     at org.apache.flink.runtime.state.filesystem.
> > > FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
> > >     at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> > > createStreamFactory(FsStateBackend.java:282)
> > >     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> > > createStreamFactory(RocksDBStateBackend.java:273
> > >
> > > I believe it has something to do with the classpath, but I am unsure
> why
> > or
> > > how to fix it.  The classpath being used during the execution is:
> > > /home/hadoop/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/
> > > ho‌​me/hadoop/flink-1.3.‌​2/lib/flink-shaded-h‌​adoop2-
> > > uber-1.3.2.ja‌​r:/home/hadoop/flink‌​-1.3.2/lib/log4j-1.2‌​.
> > > 17.jar:/home/hadoop‌​/flink-1.3.2/lib/slf‌​4j-log4j12-1.7.7.
> > > jar‌​:/home/hadoop/flink-‌​1.3.2/lib/flink-dist‌​_2.11-1.3.
> > > 2.jar::/et‌​c/hadoop/conf:
> > >
> > > I decompiled flink-shaded-h‌​adoop2-uber-1.3.2.ja‌​r and it seems the
> > > addResource function does seem to be there.
> > >
> > > Thank you
> > >
> > > On Mon, Oct 2, 2017 at 3:43 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >
> > > > Hi Andy,
> > > >
> > > > can you describe in more detail what exactly isn't working?
> > > > Do you see error messages in the log files or on the console?
> > > >
> > > > Thanks, Fabian
> > > >
> > > > 2017-10-02 15:52 GMT+02:00 Andy M. <ajm2...@gmail.com>:
> > > >
> > > > > Hello,
> > > > >
> > > > > I am about to deploy my first Flink projects  to production, but I
> am
> > > > > running into a very big hurdle.  I am unable to launch my project
> so
> > it
> > > > can
> > > > > write to an S3 bucket.  My project is running on an EMR cluster,
> > where
> > > I
> > > > > have installed Flink 1.3.2.  I am using Yarn to launch the
> > application,
> > > > and
> > > > > it seems to run fine unless I am trying to enable check
> > pointing(with a
> > > > S3
> > > > > target).  I am looking to use RocksDB as my check-pointing backend.
> > I
> > > > have
> > > > > asked a few places, and I am still unable to find a solution to
> this
> > > > > problem.  Here are my steps for creating a cluster, and launching
> my
> > > > > application, perhaps I am missing a step.  I'd be happy to provide
> > any
> > > > > additional information if needed.
> > > > >
> > > > > AWS Portal:
> > > > >
> > > > >     1) EMR -> Create Cluster
> > > > >     2) Advanced Options
> > > > >     3) Release = emr-5.8.0
> > > > >     4) Only select Hadoop 2.7.3
> > > > >     5) Next -> Next -> Next -> Create Cluster ( I do fill out
> > > > > names/keys/etc)
> > > > >
> > > > > Once the cluster is up I ssh into the Master and do the following:
> > > > >
> > > > >     1  wget
> > > > > http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-
> > > > > hadoop27-scala_2.11.tgz
> > > > >     2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
> > > > >     3  cd flink-1.3.2
> > > > >     4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
> > > > >     5  Change conf/flink-conf.yaml
> > > > >     6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar
> > > > >
> > > > > My conf/flink-conf.yaml I add the following fields:
> > > > >
> > > > >     state.backend: rocksdb
> > > > >     state.backend.fs.checkpointdir: s3:/bucket/location
> > > > >     state.checkpoints.dir: s3:/bucket/location
> > > > >
> > > > > My program's checkpointing setup:
> > > > >
> > > > >
> > > > > env.enableCheckpointing(getCheckpointRate,
> CheckpointingMode.EXACTLY_
> > > > ONCE)
> > > > >
> > > > > env.getCheckpointConfig.enableExternalizedCheckpoints(
> > > > > ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> > > > >
> > > > > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(
> > > > > getCheckpointMinPause)
> > > > >     env.getCheckpointConfig.setCheckpointTimeout(
> > getCheckpointTimeout)
> > > > >     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> > > > >     env.setStateBackend(new RocksDBStateBackend("s3://
> > > bucket/location",
> > > > > true))
> > > > >
> > > >
> > >
> >
>

Reply via email to