Hi again, Just to clarify, I am not against exposing the Pipeline if this will lead to a "clean" solution. And, I. forgot to say that the last solution, if adopted, would have to work on the JobGraph, which may not be that desirable.
Kostas On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kklou...@gmail.com> wrote: > > Hi all, > > I do not have a strong opinion on the topic yet, but I would like to > share my thoughts on this. > > In the solution proposing a wrapping AtlasExecutor around the Flink > Executors, if we allow the user to use the CLI to submit jobs, then > this means that the CLI code may have to change so that it injects the > executor option to AtlasExecutor (transparently to the user), and then > the AtlasExecutor should take what the user has actually set as > pipeline executor and find the adequate executor. If this is not done > transparently, then the user should do sth explicit to point Flink to > Atlas and then to the correct executor, which implies that we should > add user-facing stuff (like cli options) to Flink. > > For the solution of adding getPipeline() to the JobListener, I think > that from a design perspective, it does not fit in the listener > itself. The listener is a "passive" entity that is expected to listen > to specific "events". Adding a getter does not fit there. Other > options for the getPipeline() method are: > 1) add it as a method to the JobClient > 2) add it as an argument to the methods of the JobListener (along with > the JobClient and the throwable) > > Alternative 1) would currently work because the JobClient is only > instantiated by the executor. But in the future, we may (and probably > will because of implications of FLIP-85) allow a JobClient to get > "attached" to a running job. In this case, the getPipeline() will not > have a pipeline to return. > Alternative 2) will break existing code, which I am not sure how > important this is as the JobListener is a new feature and I guess some > but not many users. > > As a sidenote, if I am not mistaken, apart from Yarn, none of the > above solutions would work in per-job mode for Kuberneter, Mesos or > with web-submissions. These modes go through "special" execution > environments that use them simply to extract the JobGraph which then > they submit to the cluster. In this case, there is no executor > involved. Are these cases important to you? > > Finally, another solution, although more drastic and more involved, > could be to have a "JobListener" running on the jobMaster. This will > collect the relevant info and send them to Atlas. But I am not sure > how Atlas works and if it requires the data to be extracted on the > client side. I am saying this because the JobMasters may be running > anywhere in a cluster while the clients may run on designated machines > which can have specific configurations, e.g. open ports to communicate > with a specific Atlas server. > > Cheers, > Kostas > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> wrote: > > > > Hi Gyula! > > > > My main motivation was to try and avoid mixing an internal interface > > (Pipeline) with public API. It looks like this is trying to go "public > > stable", but doesn't really do it exactly because of mixing "pipeline" into > > this. > > You would need to cast "Pipeline" and work on internal classes in the > > implementation. > > > > If we use an "internal API" or a "semi-stable SPI" class, it looks at a > > first glance a bit cleaner and more maintainable (opening up less surface) > > to make the PipelineExecutor a "stable SPI". > > I have not checked out all the details, though. > > > > Best, > > Stephan > > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > Hi Stephan! > > > > > > Thanks for checking this out. I agree that wrapping the other > > > PipelineExecutors with a delegating AtlasExecutor would be a good > > > alternative approach to implement this but I actually feel that it suffers > > > even more problems than exposing the Pipeline instance in the JobListener. > > > > > > The main idea with the Atlas integration would be to have the Atlas hook > > > logic in the Atlas project where it would be maintained. This means that > > > any approach we take has to rely on public APIs. The JobListener is > > > already > > > a public evolving API while the PipelineExecutor and the factory is purely > > > internal. Even if we make it public it will still expose the Pipeline so > > > we > > > did not gain much on the public/internal API front. > > > > > > I also feel that since the Atlas hook logic should only observe the > > > pipeline and collect information the JobListener interface seems an ideal > > > match and the implementation can be pretty lightweight. From a purely > > > implementation perspective adding an Executor would be more heavy as it > > > has > > > to properly delegate to an other executor making sure that we don't break > > > anything. > > > > > > Don't take me wrong, I am not opposed to reworking the implementations we > > > have as it's very simple at this point but I also want to make sure that > > > we > > > take the approach that is simple from a maintainability standpoint. Of > > > course my argument rests on the assumption that the AtlasHook itself will > > > live outside of the Flink project, thats another question. > > > > > > Cheers, > > > Gyula > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org> wrote: > > > > > > > Hi all! > > > > > > > > In general, nice idea to support this integration with Atlas. > > > > > > > > I think we could make this a bit easier/lightweight with a small change. > > > > One of the issues that is not super nice is that this starts exposing > > > > the > > > > (currently empty) Pipeline interface in the public API. > > > > The Pipeline is an SPI interface that would be good to hide in the API. > > > > > > > > Since 1.10, Flink has the notion of Executors, which take the pipeline > > > and > > > > execute them. Meaning each pipeline is passed on anyways. And executors > > > are > > > > already configurable in the Flink configuration. > > > > So, instead of passing the pipeline both "down" (to the executor) and > > > > "to > > > > the side" (JobListener), could we just have a wrapping "AtlasExecutor" > > > that > > > > takes the pipeline, does whatever it wants, and then passes it to the > > > > proper executor? This would also have the advantage that it supports > > > making > > > > changes to the pipeline, if needed in the future. For example, if there > > > is > > > > ever the need to add additional configuration fields, set properties, > > > > add > > > > "labels" or so, this could be easily done in the suggested approach. > > > > > > > > I tried to sketch this in the picture below, pardon my bad drawing. > > > > > > > > [image: Listener_Executor.png] > > > > > > > > > > > > > > > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1 > > > > > > > > > > > > Best, > > > > Stephan > > > > > > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <aljos...@apache.org> > > > > wrote: > > > > > > > >> Thanks! I'm reading the document now and will get back to you. > > > >> > > > >> Best, > > > >> Aljoscha > > > >> > > > > > > >