Hi Andrea,

I think your problem should be fixed with the PRs [1,2]. I've tested it
locally on my yarn cluster and it worked.

[1] https://github.com/apache/flink/pull/1904
[2] https://github.com/apache/flink/pull/1914

Cheers,
Till

On Tue, Apr 19, 2016 at 2:16 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> I think this is another issue you’ve detected. I already spotted some
> suspicious code in the yarn deployment section. If I’m not mistaken, then
> flink-conf.yaml is read too late and is, thus, not respected. I’ll verify
> it and if valid, then I’ll open another issue and fix it.
>
> Thanks for your patience and thorough reporting. It helps a lot :-)
>
> Cheers,
> Till
> ​
>
> On Tue, Apr 19, 2016 at 2:12 PM, Andrea Sella <andrea.se...@radicalbit.io>
> wrote:
>
>> No, I tried it via scala-shell as you can see the attachment.
>>
>> Regards,
>> Andrea
>>
>> 2016-04-19 14:08 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>
>>> Hi Andrea,
>>>
>>> thanks for testing it. How did you submit the job this time? Via
>>> Zeppelin?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Apr 19, 2016 at 12:51 PM, Andrea Sella <
>>> andrea.se...@radicalbit.io> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I've used your branch fixScalaShell to test the scala-shell with our HA
>>>> cluster, it doesn't work. Same error as before
>>>>
>>>> 2016-04-19 06:40:35,030 WARN  org.apache.flink.yarn.YarnJobManager
>>>>                      - Discard message
>>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>>> aa5b034e10a850d863642a24aab75d9c),EXECUTION_RESULT_AND_STATE_CHANGES))
>>>> because the expected leader session ID
>>>> Some(bc706707-2bab-4b82-b7a7-1426dce696a7) did not equal the received
>>>> leader session ID None.
>>>>
>>>> If I submit a simple job, it works. I think it is not a problem of our
>>>> environment.
>>>>
>>>> Cheers,
>>>> Andrea
>>>>
>>>> 2016-04-18 18:41 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>>>
>>>>> Cool, that helps a lot :-)
>>>>>
>>>>> On Mon, Apr 18, 2016 at 6:06 PM, Andrea Sella <
>>>>> andrea.se...@radicalbit.io> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>>
>>>>>> Don't worry, I am going to test the PR in our HA environment.
>>>>>>
>>>>>> Cheers,
>>>>>> Andrea
>>>>>>
>>>>>>
>>>>>> 2016-04-18 17:46 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>>>>>
>>>>>>> Hi Andrea,
>>>>>>>
>>>>>>> sorry I've seen your mail too late. I already fixed the problem and
>>>>>>> opened a PR [1] for it. I hope you haven't invested too much time for 
>>>>>>> it,
>>>>>>> yet.
>>>>>>>
>>>>>>> [1] https://github.com/apache/flink/pull/1904
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Mon, Apr 18, 2016 at 11:19 AM, Andrea Sella <
>>>>>>> andrea.se...@radicalbit.io> wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>> Thanks for the support, I will take the issue and starting to work
>>>>>>>> on it asap.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Andrea
>>>>>>>>
>>>>>>>> 2016-04-18 10:32 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>>>>>>>
>>>>>>>>> Hi Andrea,
>>>>>>>>>
>>>>>>>>> I think the problem is simply that it has not been correctly
>>>>>>>>> implemented. I just checked and I think the user configuration is not 
>>>>>>>>> given
>>>>>>>>> to the PlanExecutor which is internally created. I’ve opened an
>>>>>>>>> issue for that [1].
>>>>>>>>>
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-3774
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>> ​
>>>>>>>>>
>>>>>>>>> On Fri, Apr 15, 2016 at 4:58 PM, Andrea Sella <
>>>>>>>>> andrea.se...@radicalbit.io> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Till,
>>>>>>>>>>
>>>>>>>>>> I've tried the Scala-Shell with our HA cluster, no luck again.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Andrea
>>>>>>>>>>
>>>>>>>>>> 2016-04-15 14:43 GMT+02:00 Andrea Sella <
>>>>>>>>>> andrea.se...@radicalbit.io>:
>>>>>>>>>>
>>>>>>>>>>> Hi Till,
>>>>>>>>>>>
>>>>>>>>>>> I am using a branched version of 1.0.1 where I cherry-picked
>>>>>>>>>>> FLINK-2935
>>>>>>>>>>> <https://github.com/radicalbit/flink/commit/dfbbb9e48c98b486baf279c396d1bf7de31c1f8c>
>>>>>>>>>>>  to
>>>>>>>>>>> use FlinkILoop with Configuration. My Flink interpreter is here
>>>>>>>>>>> <https://github.com/radicalbit/incubator-zeppelin/blob/flink-yarn-interpreter/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java>,
>>>>>>>>>>> I've started tweaking just two days ago and as I can see there is a
>>>>>>>>>>> Zeppelin issue
>>>>>>>>>>> <https://issues.apache.org/jira/browse/ZEPPELIN-664> to provide
>>>>>>>>>>> FlinkInterpeter working with Yarn and I need it too.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Andrea
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2016-04-15 14:20 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Andrea,
>>>>>>>>>>>>
>>>>>>>>>>>> which version of Flink are you using with Zeppelin? How do you
>>>>>>>>>>>> pass the Flink configuration to the FlinkILoop? Could you maybe 
>>>>>>>>>>>> show me
>>>>>>>>>>>> your version of Zeppelin (code).
>>>>>>>>>>>>
>>>>>>>>>>>> According to the log, the ScalaShellRemoteEnvironment didn't
>>>>>>>>>>>> get the Flink configuration with the HA settings. Therefore, it 
>>>>>>>>>>>> still tries
>>>>>>>>>>>> to connect to the jobmanager specified by the host and port 
>>>>>>>>>>>> values. The
>>>>>>>>>>>> functionality to pass in a Flink configuration object to 
>>>>>>>>>>>> FlinkILoop has
>>>>>>>>>>>> only been merged recently. You might have to switch to the 
>>>>>>>>>>>> 1.1-SNAPSHOT
>>>>>>>>>>>> version for that. This means that you would have to update the 
>>>>>>>>>>>> Flink
>>>>>>>>>>>> version in your Zeppelin branch to 1.1-SNAPSHOT to make it work.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 15, 2016 at 1:03 PM, Andrea Sella <
>>>>>>>>>>>> andrea.se...@radicalbit.io> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks to follow me with this issue :)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here the logs
>>>>>>>>>>>>> <https://gist.github.com/alkagin/663fae1fc2993f0acd3ba66697f14093>,
>>>>>>>>>>>>> are there enough?
>>>>>>>>>>>>>
>>>>>>>>>>>>> As I wrote in the previous mail, in the logs you can see also
>>>>>>>>>>>>> the Configuration.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Andrea
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2016-04-15 10:07 GMT+02:00 Till Rohrmann <trohrm...@apache.org
>>>>>>>>>>>>> >:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> In HA mode, the host and port information you provide to the
>>>>>>>>>>>>>> Shell should
>>>>>>>>>>>>>> be simply ignored. So you don't have to retrieve them from the
>>>>>>>>>>>>>> .yarn-properties file.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could you maybe run the FlinkInterpreter with debug log level
>>>>>>>>>>>>>> and share the
>>>>>>>>>>>>>> logs with me? You can also do that privately, if you don't
>>>>>>>>>>>>>> want to share
>>>>>>>>>>>>>> them on the mailing list.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I haven't tried it myself, but I thought that the Shell also
>>>>>>>>>>>>>> works with an
>>>>>>>>>>>>>> HA cluster, because it uses the same mechanism as the CLI,
>>>>>>>>>>>>>> for example.
>>>>>>>>>>>>>> I'll try it out later this day.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella <
>>>>>>>>>>>>>> andrea.se...@radicalbit.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Hi Till,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > The cluster has started in HA.
>>>>>>>>>>>>>> > I already patched Flink interpreter to allow passing the
>>>>>>>>>>>>>> Configuration to
>>>>>>>>>>>>>> > FlinkILoop. Neverthless I have to pass host and port to
>>>>>>>>>>>>>> FlinkILoop, there
>>>>>>>>>>>>>> > are required from FlinkILoop constructor and I retrieve
>>>>>>>>>>>>>> them from
>>>>>>>>>>>>>> > .yarn-properties file.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I logged Flink Configuration:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2}
>>>>>>>>>>>>>> > FlinkInterpreter.java[open]:96) - Flink Configuration: {
>>>>>>>>>>>>>> > recovery.mode=zookeeper, host=yarn,
>>>>>>>>>>>>>> > yarn-properties=/tmp/.yarn-properties-flink,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181,
>>>>>>>>>>>>>> > recovery.zookeeper.path.root=/flink/recovery}
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > and I attach some logs:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Error displayed in paragraph of Zeppelin
>>>>>>>>>>>>>> > <
>>>>>>>>>>>>>> https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > JobManager log
>>>>>>>>>>>>>> > <
>>>>>>>>>>>>>> https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Interpreter/FlinkILoop log
>>>>>>>>>>>>>> > <
>>>>>>>>>>>>>> https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > I was looking Flink shell and it works similar to the
>>>>>>>>>>>>>> interpreter, do it
>>>>>>>>>>>>>> > works with HA cluster?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thank you,
>>>>>>>>>>>>>> > Andrea
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > 2016-04-14 16:09 GMT+02:00 Till Rohrmann <
>>>>>>>>>>>>>> trohrm...@apache.org>:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > > Hi Andrea,
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > have you started the Flink Yarn cluster in HA mode? Then
>>>>>>>>>>>>>> the job manager
>>>>>>>>>>>>>> > > address is stored in ZooKeeper and you have to tell your
>>>>>>>>>>>>>> FlinkILoop that
>>>>>>>>>>>>>> > it
>>>>>>>>>>>>>> > > should retrieve the JobManager address from there. In
>>>>>>>>>>>>>> order to do that
>>>>>>>>>>>>>> > you
>>>>>>>>>>>>>> > > have to set conf.setString(ConfigConstants.RECOVERY_MODE,
>>>>>>>>>>>>>> > > "zookeeper"),
>>>>>>>>>>>>>> conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
>>>>>>>>>>>>>> > > "address of your zookeeper cluster") and
>>>>>>>>>>>>>> > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY,
>>>>>>>>>>>>>> > > "flink dir you've set") where conf is the flink
>>>>>>>>>>>>>> configuration object. The
>>>>>>>>>>>>>> > > values for the different configuration values must match
>>>>>>>>>>>>>> the values
>>>>>>>>>>>>>> > > specified in the flink-conf.yaml file. You then give the
>>>>>>>>>>>>>> FlinkILoop the
>>>>>>>>>>>>>> > > conf object.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > I’m not sure whether you can specify a custom flink
>>>>>>>>>>>>>> configuration in
>>>>>>>>>>>>>> > > Zeppelin. I think you can only specify a host and port.
>>>>>>>>>>>>>> So either you
>>>>>>>>>>>>>> > start
>>>>>>>>>>>>>> > > you Flink cluster in non-HA mode or you have to patch
>>>>>>>>>>>>>> Zeppelin.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Cheers,
>>>>>>>>>>>>>> > > Till
>>>>>>>>>>>>>> > > ​
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <
>>>>>>>>>>>>>> > andrea.se...@radicalbit.io>
>>>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > > Hi,
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > I am working to allow Zeppelin's flink interpreter to
>>>>>>>>>>>>>> connect an
>>>>>>>>>>>>>> > existing
>>>>>>>>>>>>>> > > > yarn cluster. Yarn cluster has started via yarn-session
>>>>>>>>>>>>>> and flink's
>>>>>>>>>>>>>> > > version
>>>>>>>>>>>>>> > > > is 1.0.0.
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > My approach is to read host and port from
>>>>>>>>>>>>>> .yarn-properties and pass
>>>>>>>>>>>>>> > them
>>>>>>>>>>>>>> > > to
>>>>>>>>>>>>>> > > > IFlinkLoop.
>>>>>>>>>>>>>> > > > Now I am facing an issue with Session ID when I submit
>>>>>>>>>>>>>> a paragraph to
>>>>>>>>>>>>>> > > yarn
>>>>>>>>>>>>>> > > > cluster.
>>>>>>>>>>>>>> > > > The yarn cluster throws a warning similar to:
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > 2016-04-12 10:14:32,666 WARN
>>>>>>>>>>>>>> org.apache.flink.yarn.YarnJobManager
>>>>>>>>>>>>>> > > >                  - Discard message
>>>>>>>>>>>>>> > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES))
>>>>>>>>>>>>>> > > > because the expected leader session ID
>>>>>>>>>>>>>> > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not
>>>>>>>>>>>>>> equal the received
>>>>>>>>>>>>>> > > > leader session ID None.
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > My Zeppelin's paragraph throws a
>>>>>>>>>>>>>> > > JobClientActorSubmissionTimeoutException,
>>>>>>>>>>>>>> > > > maybe is it due to the missing sessionId? Do I need to
>>>>>>>>>>>>>> pass extra
>>>>>>>>>>>>>> > params
>>>>>>>>>>>>>> > > to
>>>>>>>>>>>>>> > > > connect correctly to the yarn cluster or host and port
>>>>>>>>>>>>>> are enough?
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > Thanks in advance,
>>>>>>>>>>>>>> > > > Andrea
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to