I think exposing the Pipeline should be ok. Using the internal StreamGraph might be problematic because this might change/break but that's a problem of the external code.

Aljoscha

On 11.02.20 16:26, Gyula Fóra wrote:
Hi All!

I have made a prototype that simply adds a getPipeline() method to the
JobClient interface. Then I could easily implement the Atlas hook using the
JobListener interface. I simply check if Pipeline is instanceof StreamGraph
and do the logic there.

I think this is so far the cleanest approach and I much prefer this
compared to working on the JobGraph directly which would expose even more
messy internals.

Unfortunately this change alone is not enough for the integration as we
need to make sure that all Sources/Sinks that we want to integrate to atlas
publicly expose some of their properties:

    - Kafka source/sink:
       - Kafka props
       - Topic(s) - this is tricky for sinks
    - FS source /sink:
       - Hadoop props
       - Base path for StreamingFileSink
       - Path for ContinuousMonitoringSource

Most of these are straightforward changes, the only question is what we
want to register in Atlas from the available connectors. Ideally users
could also somehow register their own Atlas metadata for custom sources and
sinks, we could probably introduce an interface for that in Atlas.

Cheers,
Gyula

On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

Maybe we could improve the Pipeline interface in the long run, but as a
temporary solution the JobClient could expose a getPipeline() method.

That way the implementation of the JobListener could check if its a
StreamGraph or a Plan.

How bad does that sound?

Gyula

On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

Hi Aljoscha!

That's a valid concert but we should try to figure something out, many
users need this before they can use Flink.

I think the closest thing we have right now is the StreamGraph. In
contrast with the JobGraph  the StreamGraph is pretty nice from a metadata
perspective :D
The big downside of exposing the StreamGraph is that we don't have it in
batch. On the other hand we could expose the JobGraph but then the
integration component would still have to do the heavy lifting for batch
and stream specific operators and UDFs.

Instead of exposing either StreamGraph/JobGraph, we could come up with a
metadata like representation for the users but that would be like
implementing Atlas integration itself without Atlas dependencies :D

As a comparison point, this is how it works in Storm:
Every operator (spout/bolt), stores a config map (string->string) with
all the metadata such as operator class, and the operator specific configs.
The Atlas hook works on this map.
This is very fragile and depends on a lot of internals. Kind of like
exposing the JobGraph but much worse. I think we can do better.

Gyula

On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

If we need it, we can probably beef up the JobListener to allow
accessing some information about the whole graph or sources and sinks.
My only concern right now is that we don't have a stable interface for
our job graphs/pipelines right now.

Best,
Aljoscha

On 06.02.20 23:00, Gyula Fóra wrote:
Hi Jeff & Till!

Thanks for the feedback, this is exactly the discussion I was looking
for.
The JobListener looks very promising if we can expose the JobGraph
somehow
(correct me if I am wrong but it is not accessible at the moment).

I did not know about this feature that's why I added my JobSubmission
hook
which was pretty similar but only exposing the JobGraph. In general I
like
the listener better and I would not like to add anything extra if we
can
avoid it.

Actually the bigger part of the integration work that will need more
changes in Flink will be regarding the accessibility of sources/sinks
from
the JobGraph and their specific properties. For instance at the moment
the
Kafka sources and sinks do not expose anything publicly such as topics,
kafka configs, etc. Same goes for other data connectors that we need to
integrate in the long run. I guess there will be a separate thread on
this
once we iron out the initial integration points :)

I will try to play around with the JobListener interface tomorrow and
see
if I can extend it to meet our needs.

Cheers,
Gyula

On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote:

Hi Gyula,

Flink 1.10 introduced JobListener which is invoked after job
submission and
finished.  May we can add api on JobClient to get what info you
needed for
altas integration.



https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46


Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道:

Hi all!

We have started some preliminary work on the Flink - Atlas
integration at
Cloudera. It seems that the integration will require some new hook
interfaces at the jobgraph generation and submission phases, so I
figured I
will open a discussion thread with my initial ideas to get some early
feedback.

*Minimal background*
Very simply put Apache Atlas is a data governance framework that
stores
metadata for our data and processing logic to track ownership,
lineage
etc.
It is already integrated with systems like HDFS, Kafka, Hive and many
others.

Adding Flink integration would mean that we can track the input
output
data
of our Flink jobs, their owners and how different Flink jobs are
connected
to each other through the data they produce (lineage). This seems to
be a
very big deal for a lot of companies :)

*Flink - Atlas integration in a nutshell*
In order to integrate with Atlas we basically need 2 things.
   - Flink entity definitions
   - Flink Atlas hook

The entity definition is the easy part. It is a json that contains
the
objects (entities) that we want to store for any give Flink job. As a
starter we could have a single FlinkApplication entity that has a
set of
inputs and outputs. These inputs/outputs are other Atlas entities
that
are
already defines such as Kafka topic or Hbase table.

The Flink atlas hook will be the logic that creates the entity
instance
and
uploads it to Atlas when we start a new Flink job. This is the part
where
we implement the core logic.

*Job submission hook*
In order to implement the Atlas hook we need a place where we can
inspect
the pipeline, create and send the metadata when the job starts. When
we
create the FlinkApplication entity we need to be able to easily
determine
the sources and sinks (and their properties) of the pipeline.

Unfortunately there is no JobSubmission hook in Flink that could
execute
this logic and even if there was one there is a mismatch of
abstraction
levels needed to implement the integration.
We could imagine a JobSubmission hook executed in the JobManager
runner
as
this:

void onSuccessfulSubmission(JobGraph jobGraph, Configuration
configuration);

This is nice but the JobGraph makes it super difficult to extract
sources
and UDFs to create the metadata entity. The atlas entity however
could be
easily created from the StreamGraph object (used to represent the
logical
flow) before the JobGraph is generated. To go around this limitation
we
could add a JobGraphGeneratorHook interface:

void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
jobGraph);

We could then generate the atlas entity in the preprocess step and
add a
jobmission hook in the postprocess step that will simply send the
already
baked in entity.

*This kinda works but...*
The approach outlined above seems to work and we have built a POC
using
it.
Unfortunately it is far from nice as it exposes non-public APIs such
as
the
StreamGraph. Also it feels a bit weird to have 2 hooks instead of
one.

It would be much nicer if we could somehow go back from JobGraph to
StreamGraph or at least have an easy way to access source/sink UDFS.

What do you think?

Cheers,
Gyula



--
Best Regards

Jeff Zhang





Reply via email to