Despite not being directly related to the issue described in this thread, I think that if we followed the approach of the Portable Jar described in https://lists.apache.org/thread.html/2122928a0a5f678d475ec15af538eb7303f73557870af174b1fdef7e@%3Cdev.beam.apache.org%3E we could solve this issue by simply removing the dependency on the job server to start up Beam jobs.
On Fri, Aug 16, 2019 at 9:37 AM Thomas Weise <[email protected]> wrote: > In our current deployment this is solved by a wrapper script that checks > the Flink REST API and waits until the job has started or timeout. > > Even considering current Flink client API restrictions, the hack in > JobInvocation is bad. The status should be set to running by the > executor/runner, and there this can be handled depending on how the > respective client API works. > > Thomas > > On Fri, Aug 16, 2019 at 7:46 AM enrico canzonieri <[email protected]> > wrote: > >> I agree that the Job status is better checked from Flink itself. Our >> model is similar to what Thomas described in the design docs shared with >> this mailing list to run Beam on Flink in K8s. We launch a job server just >> to submit the Beam job and then we start the job monitoring directly from >> Flink. >> >> The problem I'm facing is that when submitting the beam job, the python >> terminates immediately after calling pipeline.run(). In that moment, the >> job is not yet running on Flink and it doesn't event appear on the Flink >> Job Manager, our operator expects the job to appear in Flink immediately >> after calling run (this is the behavior using the Flink CLI), so the >> operator believes the job hasn't started yet (or submission has failed) and >> tries to launch a new one. End result within few seconds we have two jobs >> running instead of one. >> >> Looking at the code, I think that calling pipeline.run() should return >> only when the job has actually transitioned in a state that is not >> STARTING. I think it's ok for the method to hang if the job can't >> transition successfully in any of the other states (RUNNING, DONE, FAILED, >> CANCELLED). E.g. in case there are not enough resources in the cluster >> it's ok for the job to be stuck in STARTING and eventually (after some >> timeout) transition to FAILED. >> >> In our first iteration the workaround was to use a longer sleep to wait >> for the job to potentially appear in Flink. I may be missing a better >> workaround here, so please feel free to recommend better options. >> >> Cheers, >> Enrico >> >> On Fri, Aug 16, 2019 at 6:57 AM Thomas Weise <[email protected]> wrote: >> >>> This gets into the gory details of the Flink client API and there is >>> plenty room for improvement. There is relevant discussion happening on the >>> Flink dev list right now. >>> >>> But before dwelling into that and how a workaround could look like in >>> Beam, do you really want to rely on the status from the Beam job server? In >>> our deployment we only use the job server to submit the pipeline and then >>> terminate it. Any monitoring is based on Flink itself. >>> >>> You can obtain the job status through the Flink REST API and the metrics >>> through Flink as well. To assert that a job was successfully launched, it >>> isn't sufficient to check that it is RUNNING in any case. There is still >>> the possibility that it never really starts due to resource issues, >>> recovery from savepoint not completing and so on. To get an accurate >>> "LAUNCHED OK" signal, it is necessary to check Flink metrics. >>> >>> Cheers, >>> Thomas >>> >>> >>> >>> On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <[email protected]> >>> wrote: >>> >>>> Hi Enrico, >>>> >>>> There is an old ticket for this: >>>> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized >>>> because submitting application "detached" is still possible using the >>>> Flink CLI or in portable pipelines. It comes with some drawbacks that >>>> you explained, e.g. inaccurate pipeline status, metrics retrieval. >>>> >>>> Things have improved since with respect to async client communication >>>> with the introduction of RestClusterClient. The tricky part is still to >>>> make this work across all deployment scenarios (local/remote). I think >>>> there are two options: >>>> >>>> 1) Wrap the entire blocking execution of the Flink API and simply >>>> propagate errors to the Beam application and update its status >>>> >>>> 2) Retrieve the JobGraph from the batch/streaming API. Submit the >>>> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink >>>> MiniCluster in case of local execution. >>>> >>>> On this note, for the next Flink version, there is a plan to add support >>>> for submitting jobs async directly from the ExecutionEnvironment. >>>> >>>> Cheers, >>>> Max >>>> >>>> On 16.08.19 01:22, enrico canzonieri wrote: >>>> > Hello, >>>> > >>>> > while launching Flink jobs using the PortableRunner I noticed that >>>> jobs >>>> > are marked by Beam as state RUNNING before actually running on Flink. >>>> > The issue doesn't seem specific to the Flink Runner though: >>>> > >>>> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90 >>>> > >>>> > I'd assume the TODO in the code is referring exactly to this issue. >>>> For >>>> > Flink specifically I guess that one of the problems is that jobs are >>>> > submitted using a blocking API from the >>>> BeamFlinkRemoteStreamEnvironment >>>> > < >>>> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361 >>>> >. >>>> > So we essentially never return from the job submission, unless the job >>>> > is completed (never for streaming) or it's cancelled or failed. A >>>> > possible solution to this is to set the Flink client as detached and >>>> > return a JobSubmissionResult instead of a JobExecutionResult and then >>>> > have a Future or event loop that tracks the actual job execution and >>>> > changes the job state accordingly. >>>> > >>>> > Playing around with the code it seems that this would be possible but >>>> it >>>> > could require a large code change, including possibly the duplication >>>> of >>>> > the entire Flink RemoteStreamEnvironment >>>> > < >>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java >>>> > >>>> > code inside Beam to customize it even more than it is already. >>>> > >>>> > Is there any ticket tracking this already? >>>> > >>>> > Cheers, >>>> > Enrico >>>> >>>
