Despite not being directly related to the issue described in this thread, I
think that if we followed the approach of the Portable Jar described in
https://lists.apache.org/thread.html/2122928a0a5f678d475ec15af538eb7303f73557870af174b1fdef7e@%3Cdev.beam.apache.org%3E
we could solve this issue by simply removing the dependency on the job
server to start up Beam jobs.

On Fri, Aug 16, 2019 at 9:37 AM Thomas Weise <[email protected]> wrote:

> In our current deployment this is solved by a wrapper script that checks
> the Flink REST API and waits until the job has started or timeout.
>
> Even considering current Flink client API restrictions, the hack in
> JobInvocation is bad. The status should be set to running by the
> executor/runner, and there this can be handled depending on how the
> respective client API works.
>
> Thomas
>
> On Fri, Aug 16, 2019 at 7:46 AM enrico canzonieri <[email protected]>
> wrote:
>
>> I agree that the Job status is better checked from Flink itself. Our
>> model is similar to what Thomas described in the design docs shared with
>> this mailing list to run Beam on Flink in K8s. We launch a job server just
>> to submit the Beam job and then we start the job monitoring directly from
>> Flink.
>>
>> The problem I'm facing is that when submitting the beam job, the python
>> terminates immediately after calling pipeline.run(). In that moment, the
>> job is not yet running on Flink and it doesn't event appear on the Flink
>> Job Manager, our operator expects the job to appear in Flink immediately
>> after calling run (this is the behavior using the Flink CLI), so the
>> operator believes the job hasn't started yet (or submission has failed) and
>> tries to launch a new one. End result within few seconds we have two jobs
>> running instead of one.
>>
>> Looking at the code, I think that calling pipeline.run() should return
>> only when the job has actually transitioned in a state that is not
>> STARTING. I think it's ok for the method to hang if the job can't
>> transition successfully in any of the other states (RUNNING, DONE, FAILED,
>> CANCELLED). E.g. in case there are not enough resources in the cluster
>> it's ok for the job to be stuck in STARTING and eventually (after some
>> timeout) transition to FAILED.
>>
>> In our first iteration the workaround was to use a longer sleep to wait
>> for the job to potentially appear in Flink. I may be missing a better
>> workaround here, so please feel free to recommend better options.
>>
>> Cheers,
>> Enrico
>>
>> On Fri, Aug 16, 2019 at 6:57 AM Thomas Weise <[email protected]> wrote:
>>
>>> This gets into the gory details of the Flink client API and there is
>>> plenty room for improvement. There is relevant discussion happening on the
>>> Flink dev list right now.
>>>
>>> But before dwelling into that and how a workaround could look like in
>>> Beam, do you really want to rely on the status from the Beam job server? In
>>> our deployment we only use the job server to submit the pipeline and then
>>> terminate it. Any monitoring is based on Flink itself.
>>>
>>> You can obtain the job status through the Flink REST API and the metrics
>>> through Flink as well. To assert that a job was successfully launched, it
>>> isn't sufficient to check that it is RUNNING in any case. There is still
>>> the possibility that it never really starts due to resource issues,
>>> recovery from savepoint not completing and so on. To get an accurate
>>> "LAUNCHED OK" signal, it is necessary to check Flink metrics.
>>>
>>> Cheers,
>>> Thomas
>>>
>>>
>>>
>>> On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <[email protected]>
>>> wrote:
>>>
>>>> Hi Enrico,
>>>>
>>>> There is an old ticket for this:
>>>> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
>>>> because submitting application "detached" is still possible using the
>>>> Flink CLI or in portable pipelines. It comes with some drawbacks that
>>>> you explained, e.g. inaccurate pipeline status, metrics retrieval.
>>>>
>>>> Things have improved since with respect to async client communication
>>>> with the introduction of RestClusterClient. The tricky part is still to
>>>> make this work across all deployment scenarios (local/remote). I think
>>>> there are two options:
>>>>
>>>> 1) Wrap the entire blocking execution of the Flink API and simply
>>>> propagate errors to the Beam application and update its status
>>>>
>>>> 2) Retrieve the JobGraph from the batch/streaming API. Submit the
>>>> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
>>>> MiniCluster in case of local execution.
>>>>
>>>> On this note, for the next Flink version, there is a plan to add support
>>>> for submitting jobs async directly from the ExecutionEnvironment.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On 16.08.19 01:22, enrico canzonieri wrote:
>>>> > Hello,
>>>> >
>>>> > while launching Flink jobs using the PortableRunner I noticed that
>>>> jobs
>>>> > are marked by Beam as state RUNNING before actually running on Flink.
>>>> > The issue doesn't seem specific to the Flink Runner though:
>>>> >
>>>> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
>>>> >
>>>> > I'd assume the TODO in the code is referring exactly to this issue.
>>>> For
>>>> > Flink specifically I guess that one of the problems is that jobs are
>>>> > submitted using a blocking API from the
>>>> BeamFlinkRemoteStreamEnvironment
>>>> > <
>>>> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361
>>>> >.
>>>> > So we essentially never return from the job submission, unless the job
>>>> > is completed (never for streaming) or it's cancelled or failed. A
>>>> > possible solution to this is to set the Flink client as detached and
>>>> > return a JobSubmissionResult instead of a JobExecutionResult and then
>>>> > have a Future or event loop that tracks the actual job execution and
>>>> > changes the job state accordingly.
>>>> >
>>>> > Playing around with the code it seems that this would be possible but
>>>> it
>>>> > could require a large code change, including possibly the duplication
>>>> of
>>>> > the entire Flink RemoteStreamEnvironment
>>>> > <
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
>>>> >
>>>> > code inside Beam to customize it even more than it is already.
>>>> >
>>>> > Is there any ticket tracking this already?
>>>> >
>>>> > Cheers,
>>>> > Enrico
>>>>
>>>

Reply via email to