Hi Robert, Hi Till,

I tried to setup high-availibility options in zepplin, but i guess it's
just a matter of flink version compatibility on zepplin side. I'll try to
compile zepplin with 1.2 and add needed parameter to see if its better.
Thanks for your help !

2017-03-27 15:09 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Maciek and Alexis,
>
> as far as I can tell, I think it is currently not possible to use Zeppelin
> with a Flink cluster running in HA mode. In order to make it work, it would
> be necessary to specify either a Flink configuration for the Flink
> interpreter (this is probably the most general solution) or to enable the
> HA mode in Zeppelin. Enabling the HA mode would mean that we set 
> high-availability:
> zookeeper in the configuration and then set all the remaining
> high-availability configuration options [1] to the same values with which
> the Flink cluster was started. This would have to be contributed to the
> Zeppelin project.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#high-availability-ha
>
> Cheers,
> Till
> ​
>
> On Thu, Mar 23, 2017 at 11:41 AM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Alexis,
>>
>> did you set the Zookeeper configuration for Flink in Zeppelin?
>>
>> On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau <
>> a.gendronn...@gmail.com> wrote:
>>
>>> Hello users,
>>>
>>> As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with
>>> Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one
>>> is running in High-availability mode.
>>>
>>> When running jobs from Zeppelin in Flink local mode, everything works
>>> fine. But when trying to submit job to remote host (no matter which version
>>> involved), job is stuck in submitting phase until it reaches
>>> akka.client.timeout.
>>>
>>> I tried to increase timeout (like said in error raised in zeppelin), but
>>> it only increase time before error is finally raised (tested with 600s).
>>>
>>> On Flink side, nothing appears but :
>>>
>>>     2017-03-20 11:19:31,675 WARN  
>>> org.apache.flink.runtime.jobmanager.JobManager
>>> - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>> 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
>>>     because the expected leader session ID 
>>> Some(f955760c-d80d-4992-a148-5968026ca6e4)
>>> did not equal the received leader session ID None.
>>>
>>>
>>> On zepplin interpreter side, we get following stacktrace :
>>>
>>>     bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount]
>>> =     org.apache.flink.api.scala.DataSet@669fc812
>>>     org.apache.flink.client.program.ProgramInvocationException: The
>>> program     execution failed: Communication with JobManager failed: Job
>>> submission to the     JobManager timed out. You may increase
>>> 'akka.client.timeout' in case the     JobManager needs more time to
>>> configure and confirm the job submission.
>>>       at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>>> ent.java:409)
>>>       at org.apache.flink.client.program.StandaloneClusterClient.subm
>>> itJob(StandaloneClusterClient.java:95)
>>>       at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>>> ent.java:382)
>>>       at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>>> ent.java:369)
>>>       at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>>> ent.java:344)
>>>       at org.apache.flink.client.RemoteExecutor.executePlanWithJars(R
>>> emoteExecutor.java:211)
>>>       at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExe
>>> cutor.java:188)
>>>       at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEn
>>> vironment.java:172)
>>>       at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>>> tionEnvironment.java:896)
>>>       at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec
>>> utionEnvironment.scala:637)
>>>       at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
>>>       ... 36 elided
>>>     Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Communication with JobManager failed: Job submission to the JobManager
>>> timed out. You may increase 'akka.client.timeout' in case the JobManager
>>> needs more time to configure and confirm the job submission.
>>>       at org.apache.flink.runtime.client.JobClient.submitJobAndWait(J
>>> obClient.java:137)
>>>       at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>>> ent.java:405)
>>>       ... 46 more
>>>     Caused by: org.apache.flink.runtime.clien
>>> t.JobClientActorSubmissionTimeoutException: Job submission to the
>>> JobManager timed out. You may increase 'akka.client.timeout' in case the
>>> JobManager needs more time to configure and confirm the job submission.
>>>       at org.apache.flink.runtime.client.JobClientActor.handleMessage
>>> (JobClientActor.java:264)
>>>       at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeader
>>> SessionID(FlinkUntypedActor.java:90)
>>>       at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(Fl
>>> inkUntypedActor.java:70)
>>>       at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp
>>> edActor.scala:167)
>>>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
>>> ava:260)
>>>       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec
>>> All(ForkJoinPool.java:1253)
>>>       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>>> kJoinPool.java:1346)
>>>       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>> l.java:1979)
>>>       at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>> orkerThread.java:107)
>>>
>>> It looks like we have to add parameters on zepplin side, but I cant see
>>> whats missing here. Any clue appreciated.
>>>
>>> Regards,
>>>
>>> 2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>>>
>>>> +Till Rohrmann <trohrm...@apache.org>, do you know what can be used to
>>>> access a HA cluster from that setting.
>>>>
>>>> Adding Till since he probably knows the HA stuff best.
>>>>
>>>> On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <m...@touk.pl> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have standalone Flink cluster configured with HA setting (i.e. with
>>>>> zookeeper recovery). How should I access it remotely, e.g. with
>>>>> Zeppelin
>>>>> notebook or scala shell?
>>>>>
>>>>> There are settings for host/port, but with HA setting they are not
>>>>> fixed
>>>>> - if I check which is *current leader* host and port and set that I get
>>>>> exception on job manager:
>>>>>
>>>>> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
>>>>> o.a.f.runtime.jobmanager.JobManager - Discard message
>>>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>>>> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
>>>>> because the expected leader session ID
>>>>> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
>>>>> leader session ID None.
>>>>>
>>>>> - I guess it's reasonable behaviour, since I should use appropriate
>>>>> LeaderRetrievalService and so on. But apparently there's no such
>>>>> possibility in scala flink shell?
>>>>>
>>>>> Is it missing feature? I can prepare patch, but I'm not sure how would
>>>>> I
>>>>> hook behaviour of ClusterClient into FlinkILoop?
>>>>>
>>>>> thanks,
>>>>>
>>>>> maciek
>>>>>
>>>>>
>>>
>>>
>>> --
>>> Alexis Gendronneau
>>>
>>> alexis.gendronn...@corp.ovh.com
>>> a.gendronn...@gmail.com
>>>
>>
>>
>


-- 
Alexis Gendronneau

alexis.gendronn...@corp.ovh.com
a.gendronn...@gmail.com

Reply via email to