Hi Justin,

I think this might be a problem in Flink's Kinesis consumer. The Flink
Kinesis consumer uses the aws-java-sdk version 1.10.71 which indeed
contains the afore mentioned methods. However, already version 1.10.46 no
longer contains this method. Thus, I suspect, that Yarn puts some older
version of this jar into the classpath. For these cases, I think we have to
shade our aws-java-sdk dependency so that it also works with older versions
of EMR.

In order to verify this, could you tell us which EMR version you're
running? Additionally, it would be helpful if you sent us the classpath
with which the Flink cluster was started on Yarn. You can find this
information at the beginning of your TaskManager log file. Thanks a lot.

Cheers,
Till

On Mon, Oct 31, 2016 at 8:22 PM, Justin Yan <justin....@remitly.com> wrote:

> Hi all - first time on the mailing list, so my apologies if I break
> protocol on anything.  Really excited to be using Flink, and hoping to be
> active here in the future!  Also, apologies for the length of this email -
> I tried to include details but may have gone overboard.
>
> The gist of my problem is an issue with packaging the Flink Kinesis
> Connector into my user code for execution on a YARN cluster in EMR -
> there's some dependency trouble happening, but after about 48 hours of
> attempts, I'm not sure how to make progress, and I'd really appreciate any
> ideas or assistance. Thank you in advance!
>
> ### First, Some Context.
>
> We're hoping to write our Flink jobs in scala 2.11.  The Flink JM/TMs
> currently run on an EMR cluster with Hadoop 2.7 as YARN containers.  We run
> our jobs via an Azkaban server, which has the Hadoop and Flink clients
> installed, and the configurations are set to point at the YARN master on
> our EMR cluster (with $HADOOP_HOME set so Flink can discover the hadoop
> configs).  We're using Java OpenJDK7 everywhere, and Maven 3.3.9 when
> building Flink from source.
>
> We use SBT and the assembly plugin to create an Uberjar of our code and
> its dependencies.  This gets uploaded to Azkaban, whereupon the following
> command is run on the azkaban server to execute a Flink job:
>
> flink run -c <className> usercodeuberjar-assembly-1.0.jar
>
> I've successfully run a few flink jobs that execute on our EMR cluster in
> this fashion (the WordCount example, etc.).
>
> ### The Problem
>
> We use AWS Kinesis, and are hoping to integrate Flink with it.  Naturally,
> we were hoping to use the Kinesis connector: <https://ci.apache.org/
> projects/flink/flink-docs-release-1.1/apis/streaming/
> connectors/kinesis.html>.
>
> After following the instructions with some experimentation, I was able to
> run a Flink Kinesis application on my laptop in Local Cluster mode.
>  (Ubuntu 16.04, local cluster initiated with the `./start-local.sh`
> command, job submitted via `flink run -c <className>
> usercodeuberjar-assembly-1.0.jar`)
>
> I uploaded the same JAR to Azkaban and tried to run the same command to
> submit to our EMR cluster, and got a `java.lang.NoSuchMethodError:
> com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()`
> (I've included the full stack trace at the bottom of this email).  I went
> to inspect the uploaded JAR with a `unzip usercodeuberjar-assembly-1.0.jar`,
> looked in `com/amazonaws` and found the SDKGlobalConfiguration.class file.
> I decompiled and inspected it, and the isInRegionOptimizedModeEnabled
> method that was purportedly missing was indeed present.
>
> I've included the steps I took to manifest this problem below, along with
> a variety of things that I tried to do to resolve the problem - any help or
> insight is greatly appreciated!
>
> ### Repro
>
> I'm not sure how to provide a clear repro, but I'll try to include as much
> detail as I can about the sequence of actions and commands I ran since
> there may be some obvious mistakes:
>
> Downloading the flink release to my laptop:
>
> wget http://www-us.apache.org/dist/flink/flink-1.1.3/flink-1.1.3-
> bin-hadoop27-scala_2.11.tgz
> tar xfzv flink-1.1.3-bin-hadoop27-scala_2.11.tgz
>
> I then SSH'd into Azkaban, and ran the same two commands, while adding the
> bin/ directory to my PATH and tweaking the config for fs.hdfs.hadoopconf.
> Next, after getting the flink binaries, I went to fetch the source code in
> order to follow the instructions here: <https://ci.apache.org/
> projects/flink/flink-docs-release-1.1/apis/streaming/
> connectors/kinesis.html>
>
> wget https://github.com/apache/flink/archive/release-1.1.3.tar.gz
> tar xfzv release-1.1.3.tar.gz
>
> Here, I wanted to leverage our EMR instance profile Role instead of
> passing in credentials, hence I wanted the AUTO value for the
> "aws.credentials.provider" config, which seems to have been added after
> 1.1.3 - I made a couple of small tweaks to AWSConfigConstants.java and
> AWSUtil.java to allow for that AUTO value.
>
> Next, we're using Scala 2.11, so per the instructions here, I changed the
> scala version: <https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/building.html#scala-versions>
>
> tools/change-scala-version.sh 2.11
>
> Back to the Kinesis Connector documentation...
>
> mvn clean install -Pinclude-kinesis -DskipTests
> cd flink-dist
> mvn clean install -Pinclude-kinesis -DskipTests
>
> When running that second mvn clean install, I get some warnings about the
> maven shade plugin having conflicting versions.  I also get a "[WARNING]
> The requested profile "include-kinesis" could not be activated because it
> does not exist."
>
> At this point, the instructions are not too clear on what to do.  I
> proceed to this section to try and figure it out: <https://ci.apache.org/
> projects/flink/flink-docs-release-1.1/apis/cluster_
> execution.html#linking-with-modules-not-contained-in-the-
> binary-distribution>
>
> My goal is to package everything in my usercode JAR, and I'll try to do
> that with SBT.  My first try is to install the Flink Kinesis Connector JAR
> generated by mvn clean install to my local Maven Repo:
>
> mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.1.3.jar
>
> I then build the jar with a build.sbt that looks like this (extraneous
> details removed):
>
> scalaVersion in ThisBuild := "2.11.8"
>
> val flinkVersion = "1.1.3"
>
> val flinkDependencies = Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-kinesis" % flinkVersion
> )
>
> lazy val proj = (project in file(".")).
>   settings(
>     libraryDependencies ++= flinkDependencies
>   )
>
> After this builds, I unzip the jar and use JD to decompile the
> com.amazonaws.SDKGlobalConfiguration class file to see if the method in
> question is present or not (it is).  I then run the jar locally with a
> `flink run -c <className> usercodeuberjar-assembly-1.0.jar`, and I see it
> running just fine when navigating to localhost:8081.  I then upload this
> same JAR to our Azkaban server, and run the same `flink run -c <className>
> usercodeuberjar-assembly-1.0.jar` command to submit as a YARN application
> - this time, I get the `NoSuchMethodError`.
>
> I've tried a variety of permutations of this, so I'll attempt to list them
> out along with their results:
>
> 1. A non-kinesis Flink job: I was able to successfully the example
> WordCount Flink job as a YARN application.
> 2. I mvn installed the newly built flink-scala and flink-streaming-scala
> JARs to my local maven repository in case these were different - after
> building and running on Azkaban... same error.
> 3. Using the newly-built flink-dist JAR (with the -Pinclude-kinesis flag):
> After replacing the flink-dist JAR in the /lib dir on Azkaban (that the
> `flink` command runs), I still had the same error.
> 4. Packaging the JAR in different ways:
>     - I tried adding the flink-connector-kinesis JAR by adding it to a
> /lib directory in my SBT project for direct inclusion.  This actually
> caused the NoSuchMethodError to occur during *local* execution as well.
>     - I tried using mvn-assembly to package all of the
> flink-connector-kinesis dependencies into that JAR, and then added it to
> the /lib directory in my SBT project.  Local execution no longer has error,
> but submission from Azkaban still has same error.
> 5. I thought it might be a classpath issue (since my laptop doesn't have a
> hadoop installation, so I figured there may be some kind of collision with
> the AWS SDK included by EMR), so I set, on Azkaban, the environment
> variable FLINK_CLASSPATH=usercodeuberjar-assembly-1.0.jar in order to get
> it prepended - same error.
> 6.  I realized this wasn't necessarily doing anything to the resolution of
> classnames of the Flink job executing in YARN.  So I dug into the client
> source, which eventually led me to 
> flink-clients/.../program/PackagedProgram.java
> which has the following line of code setting the ClassLoader:
>
> this.userCodeClassLoader = 
> JobWithJars.buildUserCodeClassLoader(getAllLibraries(),
> classpaths, getClass().getClassLoader());
>
> getAllLibraries() does seem to set the jar that you pass into the `flink`
> command at the top of the class resolution hierarchy, which, as my previous
> foray into decompilation shows, does seem to include the method that is
> supposedly missing.
>
> At this point, I ran out of ideas to investigate, and so I'm hoping
> someone here is able to help me.  Thanks in advance for reading this!
>
> Full Stack Trace:
>
> java.lang.NoSuchMethodError: com.amazonaws.SDKGlobalConfiguration.
> isInRegionOptimizedModeEnabled()Z
> at com.amazonaws.ClientConfigurationFactory.getConfig(
> ClientConfigurationFactory.java:35)
> at org.apache.flink.streaming.connectors.kinesis.util.
> AWSUtil.createKinesisClient(AWSUtil.java:50)
> at org.apache.flink.streaming.connectors.kinesis.proxy.
> KinesisProxy.(KinesisProxy.java:118)
> at org.apache.flink.streaming.connectors.kinesis.proxy.
> KinesisProxy.create(KinesisProxy.java:176)
> at org.apache.flink.streaming.connectors.kinesis.internals.
> KinesisDataFetcher.(KinesisDataFetcher.java:188)
> at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(
> FlinkKinesisConsumer.java:198)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:80)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:53)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:56)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> at java.lang.Thread.run(Thread.java:745)
>

Reply via email to