Hello,

while launching Flink jobs using the PortableRunner I noticed that jobs are
marked by Beam as state RUNNING before actually running on Flink. The issue
doesn't seem specific to the Flink Runner though:
https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90

I'd assume the TODO in the code is referring exactly to this issue. For
Flink specifically I guess that one of the problems is that jobs are
submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
<https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361>.
So we essentially never return from the job submission, unless the job is
completed (never for streaming) or it's cancelled or failed. A possible
solution to this is to set the Flink client as detached and return a
JobSubmissionResult instead of a JobExecutionResult and then have a Future
or event loop that tracks the actual job execution and changes the job
state accordingly.

Playing around with the code it seems that this would be possible but it
could require a large code change, including possibly the duplication of
the entire Flink RemoteStreamEnvironment
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java>
code inside Beam to customize it even more than it is already.

Is there any ticket tracking this already?

Cheers,
Enrico

Reply via email to