Hi all - first time on the mailing list, so my apologies if I break
protocol on anything.  Really excited to be using Flink, and hoping to be
active here in the future!  Also, apologies for the length of this email -
I tried to include details but may have gone overboard.

The gist of my problem is an issue with packaging the Flink Kinesis
Connector into my user code for execution on a YARN cluster in EMR -
there's some dependency trouble happening, but after about 48 hours of
attempts, I'm not sure how to make progress, and I'd really appreciate any
ideas or assistance. Thank you in advance!

### First, Some Context.

We're hoping to write our Flink jobs in scala 2.11.  The Flink JM/TMs
currently run on an EMR cluster with Hadoop 2.7 as YARN containers.  We run
our jobs via an Azkaban server, which has the Hadoop and Flink clients
installed, and the configurations are set to point at the YARN master on
our EMR cluster (with $HADOOP_HOME set so Flink can discover the hadoop
configs).  We're using Java OpenJDK7 everywhere, and Maven 3.3.9 when
building Flink from source.

We use SBT and the assembly plugin to create an Uberjar of our code and its
dependencies.  This gets uploaded to Azkaban, whereupon the following
command is run on the azkaban server to execute a Flink job:

flink run -c <className> usercodeuberjar-assembly-1.0.jar

I've successfully run a few flink jobs that execute on our EMR cluster in
this fashion (the WordCount example, etc.).

### The Problem

We use AWS Kinesis, and are hoping to integrate Flink with it.  Naturally,
we were hoping to use the Kinesis connector: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kinesis.html
>.

After following the instructions with some experimentation, I was able to
run a Flink Kinesis application on my laptop in Local Cluster mode.
 (Ubuntu 16.04, local cluster initiated with the `./start-local.sh`
command, job submitted via `flink run -c <className>
usercodeuberjar-assembly-1.0.jar`)

I uploaded the same JAR to Azkaban and tried to run the same command to
submit to our EMR cluster, and got a `java.lang.NoSuchMethodError:
com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()`
(I've included the full stack trace at the bottom of this email).  I went
to inspect the uploaded JAR with a `unzip
usercodeuberjar-assembly-1.0.jar`, looked in `com/amazonaws` and found the
SDKGlobalConfiguration.class file.  I decompiled and inspected it, and the
isInRegionOptimizedModeEnabled method that was purportedly missing was
indeed present.

I've included the steps I took to manifest this problem below, along with a
variety of things that I tried to do to resolve the problem - any help or
insight is greatly appreciated!

### Repro

I'm not sure how to provide a clear repro, but I'll try to include as much
detail as I can about the sequence of actions and commands I ran since
there may be some obvious mistakes:

Downloading the flink release to my laptop:

wget
http://www-us.apache.org/dist/flink/flink-1.1.3/flink-1.1.3-bin-hadoop27-scala_2.11.tgz
tar xfzv flink-1.1.3-bin-hadoop27-scala_2.11.tgz

I then SSH'd into Azkaban, and ran the same two commands, while adding the
bin/ directory to my PATH and tweaking the config for fs.hdfs.hadoopconf.
Next, after getting the flink binaries, I went to fetch the source code in
order to follow the instructions here: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kinesis.html
>

wget https://github.com/apache/flink/archive/release-1.1.3.tar.gz
tar xfzv release-1.1.3.tar.gz

Here, I wanted to leverage our EMR instance profile Role instead of passing
in credentials, hence I wanted the AUTO value for the
"aws.credentials.provider" config, which seems to have been added after
1.1.3 - I made a couple of small tweaks to AWSConfigConstants.java and
AWSUtil.java to allow for that AUTO value.

Next, we're using Scala 2.11, so per the instructions here, I changed the
scala version: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/building.html#scala-versions
>

tools/change-scala-version.sh 2.11

Back to the Kinesis Connector documentation...

mvn clean install -Pinclude-kinesis -DskipTests
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests

When running that second mvn clean install, I get some warnings about the
maven shade plugin having conflicting versions.  I also get a "[WARNING]
The requested profile "include-kinesis" could not be activated because it
does not exist."

At this point, the instructions are not too clear on what to do.  I proceed
to this section to try and figure it out: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
>

My goal is to package everything in my usercode JAR, and I'll try to do
that with SBT.  My first try is to install the Flink Kinesis Connector JAR
generated by mvn clean install to my local Maven Repo:

mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.1.3.jar

I then build the jar with a build.sbt that looks like this (extraneous
details removed):

scalaVersion in ThisBuild := "2.11.8"

val flinkVersion = "1.1.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kinesis" % flinkVersion
)

lazy val proj = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies
  )

After this builds, I unzip the jar and use JD to decompile the
com.amazonaws.SDKGlobalConfiguration class file to see if the method in
question is present or not (it is).  I then run the jar locally with a
`flink run -c <className> usercodeuberjar-assembly-1.0.jar`, and I see it
running just fine when navigating to localhost:8081.  I then upload this
same JAR to our Azkaban server, and run the same `flink run -c <className>
usercodeuberjar-assembly-1.0.jar` command to submit as a YARN application -
this time, I get the `NoSuchMethodError`.

I've tried a variety of permutations of this, so I'll attempt to list them
out along with their results:

1. A non-kinesis Flink job: I was able to successfully the example
WordCount Flink job as a YARN application.
2. I mvn installed the newly built flink-scala and flink-streaming-scala
JARs to my local maven repository in case these were different - after
building and running on Azkaban... same error.
3. Using the newly-built flink-dist JAR (with the -Pinclude-kinesis flag):
After replacing the flink-dist JAR in the /lib dir on Azkaban (that the
`flink` command runs), I still had the same error.
4. Packaging the JAR in different ways:
    - I tried adding the flink-connector-kinesis JAR by adding it to a /lib
directory in my SBT project for direct inclusion.  This actually caused the
NoSuchMethodError to occur during *local* execution as well.
    - I tried using mvn-assembly to package all of the
flink-connector-kinesis dependencies into that JAR, and then added it to
the /lib directory in my SBT project.  Local execution no longer has error,
but submission from Azkaban still has same error.
5. I thought it might be a classpath issue (since my laptop doesn't have a
hadoop installation, so I figured there may be some kind of collision with
the AWS SDK included by EMR), so I set, on Azkaban, the environment
variable FLINK_CLASSPATH=usercodeuberjar-assembly-1.0.jar in order to get
it prepended - same error.
6.  I realized this wasn't necessarily doing anything to the resolution of
classnames of the Flink job executing in YARN.  So I dug into the client
source, which eventually led me to
flink-clients/.../program/PackagedProgram.java which has the following line
of code setting the ClassLoader:

this.userCodeClassLoader =
JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths,
getClass().getClassLoader());

getAllLibraries() does seem to set the jar that you pass into the `flink`
command at the top of the class resolution hierarchy, which, as my previous
foray into decompilation shows, does seem to include the method that is
supposedly missing.

At this point, I ran out of ideas to investigate, and so I'm hoping someone
here is able to help me.  Thanks in advance for reading this!

Full Stack Trace:

java.lang.NoSuchMethodError:
com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()Z
at
com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:35)
at
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.createKinesisClient(AWSUtil.java:50)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:118)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:176)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:188)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:198)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Reply via email to