Hi Till, The cluster has started in HA. I already patched Flink interpreter to allow passing the Configuration to FlinkILoop. Neverthless I have to pass host and port to FlinkILoop, there are required from FlinkILoop constructor and I retrieve them from .yarn-properties file.
I logged Flink Configuration: INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} FlinkInterpreter.java[open]:96) - Flink Configuration: { recovery.mode=zookeeper, host=yarn, yarn-properties=/tmp/.yarn-properties-flink, recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, recovery.zookeeper.path.root=/flink/recovery} and I attach some logs: Error displayed in paragraph of Zeppelin <https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9> JobManager log <https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a> Interpreter/FlinkILoop log <https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7> I was looking Flink shell and it works similar to the interpreter, do it works with HA cluster? Thank you, Andrea 2016-04-14 16:09 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Hi Andrea, > > have you started the Flink Yarn cluster in HA mode? Then the job manager > address is stored in ZooKeeper and you have to tell your FlinkILoop that it > should retrieve the JobManager address from there. In order to do that you > have to set conf.setString(ConfigConstants.RECOVERY_MODE, > "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, > "address of your zookeeper cluster") and > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, > "flink dir you've set") where conf is the flink configuration object. The > values for the different configuration values must match the values > specified in the flink-conf.yaml file. You then give the FlinkILoop the > conf object. > > I’m not sure whether you can specify a custom flink configuration in > Zeppelin. I think you can only specify a host and port. So either you start > you Flink cluster in non-HA mode or you have to patch Zeppelin. > > Cheers, > Till > > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <andrea.se...@radicalbit.io> > wrote: > > > Hi, > > > > I am working to allow Zeppelin's flink interpreter to connect an existing > > yarn cluster. Yarn cluster has started via yarn-session and flink's > version > > is 1.0.0. > > > > My approach is to read host and port from .yarn-properties and pass them > to > > IFlinkLoop. > > Now I am facing an issue with Session ID when I submit a paragraph to > yarn > > cluster. > > The yarn cluster throws a warning similar to: > > > > 2016-04-12 10:14:32,666 WARN org.apache.flink.yarn.YarnJobManager > > - Discard message > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: > > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) > > because the expected leader session ID > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received > > leader session ID None. > > > > My Zeppelin's paragraph throws a > JobClientActorSubmissionTimeoutException, > > maybe is it due to the missing sessionId? Do I need to pass extra params > to > > connect correctly to the yarn cluster or host and port are enough? > > > > Thanks in advance, > > Andrea > > >