Hi Andrea,

have you started the Flink Yarn cluster in HA mode? Then the job manager
address is stored in ZooKeeper and you have to tell your FlinkILoop that it
should retrieve the JobManager address from there. In order to do that you
have to set conf.setString(ConfigConstants.RECOVERY_MODE,
"zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
"address of your zookeeper cluster") and
conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY,
"flink dir you've set") where conf is the flink configuration object. The
values for the different configuration values must match the values
specified in the flink-conf.yaml file. You then give the FlinkILoop the
conf object.

I’m not sure whether you can specify a custom flink configuration in
Zeppelin. I think you can only specify a host and port. So either you start
you Flink cluster in non-HA mode or you have to patch Zeppelin.

Cheers,
Till
​

On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <andrea.se...@radicalbit.io>
wrote:

> Hi,
>
> I am working to allow Zeppelin's flink interpreter to connect an existing
> yarn cluster. Yarn cluster has started via yarn-session and flink's version
> is 1.0.0.
>
> My approach is to read host and port from .yarn-properties and pass them to
> IFlinkLoop.
> Now I am facing an issue with Session ID when I submit a paragraph to yarn
> cluster.
> The yarn cluster throws a warning similar to:
>
> 2016-04-12 10:14:32,666 WARN  org.apache.flink.yarn.YarnJobManager
>                  - Discard message
> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES))
> because the expected leader session ID
> Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received
> leader session ID None.
>
> My Zeppelin's paragraph throws a JobClientActorSubmissionTimeoutException,
> maybe is it due to the missing sessionId? Do I need to pass extra params to
> connect correctly to the yarn cluster or host and port are enough?
>
> Thanks in advance,
> Andrea
>

Reply via email to