Hi Till,

The cluster has started in HA.
I already patched Flink interpreter to allow passing the Configuration to
FlinkILoop. Neverthless I have to pass host and port to FlinkILoop, there
are required from FlinkILoop constructor and I retrieve them from
.yarn-properties file.

I logged Flink Configuration:

INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2}
FlinkInterpreter.java[open]:96) - Flink Configuration: {
recovery.mode=zookeeper, host=yarn,
yarn-properties=/tmp/.yarn-properties-flink,
recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181,
recovery.zookeeper.path.root=/flink/recovery}

and I attach some logs:

Error displayed in paragraph of Zeppelin
<https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9>
JobManager log
<https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a>
Interpreter/FlinkILoop log
<https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7>

I was looking Flink shell and it works similar to the interpreter, do it
works with HA cluster?

Thank you,
Andrea


2016-04-14 16:09 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Andrea,
>
> have you started the Flink Yarn cluster in HA mode? Then the job manager
> address is stored in ZooKeeper and you have to tell your FlinkILoop that it
> should retrieve the JobManager address from there. In order to do that you
> have to set conf.setString(ConfigConstants.RECOVERY_MODE,
> "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
> "address of your zookeeper cluster") and
> conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY,
> "flink dir you've set") where conf is the flink configuration object. The
> values for the different configuration values must match the values
> specified in the flink-conf.yaml file. You then give the FlinkILoop the
> conf object.
>
> I’m not sure whether you can specify a custom flink configuration in
> Zeppelin. I think you can only specify a host and port. So either you start
> you Flink cluster in non-HA mode or you have to patch Zeppelin.
>
> Cheers,
> Till
> ​
>
> On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <andrea.se...@radicalbit.io>
> wrote:
>
> > Hi,
> >
> > I am working to allow Zeppelin's flink interpreter to connect an existing
> > yarn cluster. Yarn cluster has started via yarn-session and flink's
> version
> > is 1.0.0.
> >
> > My approach is to read host and port from .yarn-properties and pass them
> to
> > IFlinkLoop.
> > Now I am facing an issue with Session ID when I submit a paragraph to
> yarn
> > cluster.
> > The yarn cluster throws a warning similar to:
> >
> > 2016-04-12 10:14:32,666 WARN  org.apache.flink.yarn.YarnJobManager
> >                  - Discard message
> > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
> > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES))
> > because the expected leader session ID
> > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received
> > leader session ID None.
> >
> > My Zeppelin's paragraph throws a
> JobClientActorSubmissionTimeoutException,
> > maybe is it due to the missing sessionId? Do I need to pass extra params
> to
> > connect correctly to the yarn cluster or host and port are enough?
> >
> > Thanks in advance,
> > Andrea
> >
>

Reply via email to