Hi Gyula,

thanks for starting this discussion. Before diving in the details of how to
implement this feature, I wanted to ask whether it is strictly required
that the Atlas integration lives within Flink or not? Could it also work if
you have tool which receives job submissions, extracts the required
information, forwards the job submission to Flink, monitors the execution
result and finally publishes some information to Atlas (modulo some other
steps which are missing in my description)? Having a different layer being
responsible for this would keep complexity out of Flink.

Cheers,
Till

On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra <gyf...@apache.org> wrote:

> Hi all!
>
> We have started some preliminary work on the Flink - Atlas integration at
> Cloudera. It seems that the integration will require some new hook
> interfaces at the jobgraph generation and submission phases, so I figured I
> will open a discussion thread with my initial ideas to get some early
> feedback.
>
> *Minimal background*
> Very simply put Apache Atlas is a data governance framework that stores
> metadata for our data and processing logic to track ownership, lineage etc.
> It is already integrated with systems like HDFS, Kafka, Hive and many
> others.
>
> Adding Flink integration would mean that we can track the input output data
> of our Flink jobs, their owners and how different Flink jobs are connected
> to each other through the data they produce (lineage). This seems to be a
> very big deal for a lot of companies :)
>
> *Flink - Atlas integration in a nutshell*
> In order to integrate with Atlas we basically need 2 things.
>  - Flink entity definitions
>  - Flink Atlas hook
>
> The entity definition is the easy part. It is a json that contains the
> objects (entities) that we want to store for any give Flink job. As a
> starter we could have a single FlinkApplication entity that has a set of
> inputs and outputs. These inputs/outputs are other Atlas entities that are
> already defines such as Kafka topic or Hbase table.
>
> The Flink atlas hook will be the logic that creates the entity instance and
> uploads it to Atlas when we start a new Flink job. This is the part where
> we implement the core logic.
>
> *Job submission hook*
> In order to implement the Atlas hook we need a place where we can inspect
> the pipeline, create and send the metadata when the job starts. When we
> create the FlinkApplication entity we need to be able to easily determine
> the sources and sinks (and their properties) of the pipeline.
>
> Unfortunately there is no JobSubmission hook in Flink that could execute
> this logic and even if there was one there is a mismatch of abstraction
> levels needed to implement the integration.
> We could imagine a JobSubmission hook executed in the JobManager runner as
> this:
>
> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> configuration);
>
> This is nice but the JobGraph makes it super difficult to extract sources
> and UDFs to create the metadata entity. The atlas entity however could be
> easily created from the StreamGraph object (used to represent the logical
> flow) before the JobGraph is generated. To go around this limitation we
> could add a JobGraphGeneratorHook interface:
>
> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> jobGraph);
>
> We could then generate the atlas entity in the preprocess step and add a
> jobmission hook in the postprocess step that will simply send the already
> baked in entity.
>
> *This kinda works but...*
> The approach outlined above seems to work and we have built a POC using it.
> Unfortunately it is far from nice as it exposes non-public APIs such as the
> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>
> It would be much nicer if we could somehow go back from JobGraph to
> StreamGraph or at least have an easy way to access source/sink UDFS.
>
> What do you think?
>
> Cheers,
> Gyula
>

Reply via email to