Hi Shammon,

Thanks for the update!

Best regards,
Jing

On Fri, Jul 7, 2023 at 4:46 AM Shammon FY <zjur...@gmail.com> wrote:

> Thanks Jing, sounds good to me.
>
> I have updated the FLIP and renamed the lineage related classes to
> `LineageGraph`, `LineageVertex` and `LineageEdge` and keep it consistent
> with the job definition in Flink.
>
> Best,
> Shammon FY
>
> On Thu, Jul 6, 2023 at 8:25 PM Jing Ge <j...@ververica.com.invalid> wrote:
>
> > Hi Shammon,
> >
> > Thanks for the clarification. Atlas might have his historical reason back
> > to the hadoop era or maybe even back to the hibernate where Entity and
> > Relation were commonly used. Flink already used Vertex and Edge to
> describe
> > DAG. Some popular tools like dbt are also using this convention[1] and,
> > afaik, most graph frameworks use vertex and edge too. It will be easier
> for
> > Flink devs and users to have a consistent naming convention for the same
> > concept, i.e. in this case, DAG.
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> >
> https://docs.getdbt.com/docs/dbt-cloud-apis/discovery-use-cases-and-examples#discovery
> >
> > On Wed, Jul 5, 2023 at 11:28 AM Shammon FY <zjur...@gmail.com> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for your feedback.
> > >
> > > > 1. TableColumnLineageRelation#sinkColumn() should return
> > > TableColumnLineageEntity instead of String, right?
> > >
> > > The `sinkColumn()` will return `String` which is the column name in the
> > > sink connector. I found the name of `TableColumnLineageEntity` may
> > > cause ambiguity and I have renamed it to
> > `TableColumnSourceLineageEntity`.
> > > In my mind the `TableColumnLineageRelation` represents the lineage for
> > each
> > > sink column, each column may be computed from multiple sources and
> > columns.
> > > I use `TableColumnSourceLineageEntity` to manage each source and its
> > > columns for the sink column, so `TableColumnLineageRelation` has a sink
> > > column name and `TableColumnSourceLineageEntity` list.
> > >
> > > > 2. Since LineageRelation already contains all information to build
> the
> > > lineage between sources and sink, do we still need to set the
> > LineageEntity
> > > in the source?
> > >
> > > The lineage interface of `DataStream` is very flexible. We have added
> > > `setLineageEntity` to the source to limit and verify user behavior,
> > > ensuring that users have not added non-existent sources as lineage.
> > >
> > > > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> > LineageEdges
> > > which contains multiple LineageEdge?
> > >
> > > We referred to `Atlas` for the name of lineage, it uses `Entity` and
> > > `Relation` to represent the lineage relationship and another metadata
> > > service `Datahub` uses `DataSet` to represent the entity. I think
> > `Entity`
> > > and `Relation` are nicer for lineage, what do you think of it?
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <j...@ververica.com.invalid>
> > > wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for your proposal. After reading the FLIP, I'd like to ask
> > > > some questions to make sure we are on the same page. Thanks!
> > > >
> > > > 1. TableColumnLineageRelation#sinkColumn() should return
> > > > TableColumnLineageEntity instead of String, right?
> > > >
> > > > 2. Since LineageRelation already contains all information to build
> the
> > > > lineage between sources and sink, do we still need to set the
> > > LineageEntity
> > > > in the source?
> > > >
> > > > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > > > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> > > LineageEdges
> > > > which contains multiple LineageEdge? E.g. multiple sources join into
> > one
> > > > sink, or, edges of columns from one or different tables, etc.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zjur...@gmail.com>
> wrote:
> > > >
> > > > > Hi yuxia and Yun,
> > > > >
> > > > > Thanks for your input.
> > > > >
> > > > > For yuxia:
> > > > > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
> > > including?
> > > > >
> > > > > At present, we only need to notify the listener when a job goes to
> > > > > termination, but I think it makes sense to add generic `oldStatus`
> > and
> > > > > `newStatus` in the listener and users can update the job state in
> > their
> > > > > service as needed.
> > > > >
> > > > > > 2: I'm really confused about the `config()` included in
> > > > `LineageEntity`,
> > > > > where is it from and what is it for ?
> > > > >
> > > > > The `config` in `LineageEntity` is used for users to get options
> for
> > > > source
> > > > > and sink connectors. As the examples in the FLIP, users can add
> > > > > server/group/topic information in the config for kafka and create
> > > lineage
> > > > > entities for `DataStream` jobs, then the listeners can get this
> > > > information
> > > > > to identify the same connector in different jobs. Otherwise, the
> > > `config`
> > > > > in `TableLineageEntity` will be the same as `getOptions` in
> > > > > `CatalogBaseTable`.
> > > > >
> > > > > > 3: Regardless whether `inputChangelogMode` in
> > > `TableSinkLineageEntity`
> > > > is
> > > > > needed or not, since `TableSinkLineageEntity` contains
> > > > > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > > > > changelogmode?
> > > > >
> > > > > At present, we do not actually use the changelog mode. It can be
> > > deleted,
> > > > > and I have updated FLIP.
> > > > >
> > > > > > Btw, since there're a lot interfaces proposed, I think it'll be
> > > better
> > > > to
> > > > > give an example about how to implement a listener in this FLIP to
> > make
> > > us
> > > > > know better about the interfaces.
> > > > >
> > > > > I have added the example in the FLIP and the related interfaces and
> > > > > examples are in branch [1].
> > > > >
> > > > > For Yun:
> > > > > > I have one more question on the lookup-join dim tables, it seems
> > this
> > > > > FLIP does not touch them, and will them become part of the
> > > > > List<LineageEntity> sources() or adding another interface?
> > > > >
> > > > > You're right, currently lookup join dim tables were not considered
> in
> > > the
> > > > > 'proposed changed' section of this FLIP. But the interface for
> > lineage
> > > is
> > > > > universal and we can give `TableLookupSourceLineageEntity` which
> > > > implements
> > > > > `TableSourceLineageEntity` in the future without modifying the
> public
> > > > > interface.
> > > > >
> > > > > > By the way, if you want to focus on job lineage instead of data
> > > column
> > > > > lineage in this FLIP, why we must introduce so many column-lineage
> > > > related
> > > > > interface here?
> > > > >
> > > > > The lineage information in SQL jobs includes table lineage and
> column
> > > > > lineage. Although SQL jobs currently do not support column lineage,
> > we
> > > > > would like to support this in the next step. So we have
> > comprehensively
> > > > > considered the table lineage and column lineage interfaces here,
> and
> > > > > defined these two interfaces together clearly
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote:
> > > > >
> > > > > > Hi Shammon,
> > > > > >
> > > > > > I like the idea in general and it will help to analysis the job
> > > > lineages
> > > > > > no matter FlinkSQL or Flink jar jobs in production environments.
> > > > > >
> > > > > > For Qingsheng's concern, I'd like the name of JobType more than
> > > > > > RuntimeExecutionMode, as the latter one is not easy to understand
> > for
> > > > > users.
> > > > > >
> > > > > > I have one more question on the lookup-join dim tables, it seems
> > this
> > > > > FLIP
> > > > > > does not touch them, and will them become part of the
> > > > List<LineageEntity>
> > > > > > sources()​ or adding another interface?
> > > > > >
> > > > > > By the way, if you want to focus on job lineage instead of data
> > > column
> > > > > > lineage in this FLIP, why we must introduce so many
> column-lineage
> > > > > related
> > > > > > interface here?
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > > ________________________________
> > > > > > From: Shammon FY <zjur...@gmail.com>
> > > > > > Sent: Sunday, June 25, 2023 16:13
> > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> > > > Listener
> > > > > >
> > > > > > Hi Qingsheng,
> > > > > >
> > > > > > Thanks for your valuable feedback.
> > > > > >
> > > > > > > 1. Is there any specific use case to expose the batch /
> streaming
> > > > info
> > > > > to
> > > > > > listeners or meta services?
> > > > > >
> > > > > > I agree with you that Flink is evolving towards batch-streaming
> > > > > > unification, but the lifecycle of them is different. If a job
> > > > processes a
> > > > > > bound dataset, it will end after completing the data processing,
> > > > > otherwise,
> > > > > > it will run for a long time. In our scenario, we will regularly
> > > > schedule
> > > > > > some Flink jobs to process bound dataset and update some job
> > > > information
> > > > > to
> > > > > > the lineage information for the "batch" jobs such as scheduled
> > > > timestamp,
> > > > > > execution duration when jobs are finished, which is different
> from
> > > > > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode`
> and
> > > > > > `existsUnboundedSource` in `StreamingGraph` and
> > > > `StreamingGraphGenerator`
> > > > > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > > > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean
> flag,
> > > what
> > > > > do
> > > > > > you think of it?
> > > > > >
> > > > > > > 2. it’s better to be more specific here to tell users what
> > > > information
> > > > > > they could expect to see here, instead of just a “job
> > configuration”
> > > as
> > > > > > described in JavaDoc.
> > > > > >
> > > > > > Thanks and I have updated the doc in FLIP.
> > > > > >
> > > > > > > 3. About the IO executor in
> > > JobStatusChangedListenerFactory.Context.
> > > > > >
> > > > > > I have updated the docs for io executor  in
> > > > > > `JobStatusChangedListenerFactory.Context`, it is a regular thread
> > > pool
> > > > > and
> > > > > > executes submitted tasks in parallel. Users can submit tasks to
> the
> > > > > > executor which ensures that the submitted task can be executed
> > before
> > > > the
> > > > > > job exits.
> > > > > >
> > > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> > > list
> > > > of
> > > > > > LineageEntity.
> > > > > >
> > > > > > In the initial idea, the `LineageRelationEntity` is used for
> > > > `DataStream`
> > > > > > to set additional lineage information besides source. For
> example,
> > > > there
> > > > > > are table and column lineages in SQL jobs. When we build a
> > > `DataStream`
> > > > > job
> > > > > > with table source and sink, we can add table lineage in the
> > following
> > > > > > method.
> > > > > > ```
> > > > > > public class DataStreamSink {
> > > > > >     public DataStreamSink setLineageSources(LineageEntity ...
> > > sources);
> > > > > > }
> > > > > > ```
> > > > > > But we can not set column lineage for the above sink, and for the
> > > sake
> > > > of
> > > > > > universality, we do not want to add a method similar to
> > > > `addLineageColumn
> > > > > > (...)` in `DataStreamSink`. So I put this information into
> > > > > > LineageRelationEntity so that SQL and DataStream jobs can be
> > > > consistent.
> > > > > > But as you mentioned, this approach does indeed lead to ambiguity
> > and
> > > > > > complexity. So my current idea is to add the `setLineageRelation`
> > > > method
> > > > > in
> > > > > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> > > > updated
> > > > > > the FLIP and please help to review it again, thanks.
> > > > > >
> > > > > > > 5. I can’t find the definition of CatalogContext in the current
> > > code
> > > > > base
> > > > > > and Flink, which appears in the TableLineageEntity.
> > > > > >
> > > > > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > > > > >
> > > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode
> and a
> > > > > boolean
> > > > > > (the “override” is quite confusing). I’m wondering if these are
> > > > necessary
> > > > > > for meta services, as they are actually concepts defined in the
> > > runtime
> > > > > > level of Flink Table / SQL.
> > > > > >
> > > > > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > > > > `ChangelogMode` and `override` are mainly used for verification
> and
> > > > > > display. For example, Flink currently supports `INSERT`/`DELETE`
> > and
> > > > > > `UPDATE`, we only want to report and update lineage for `INSERT`
> > jobs
> > > > in
> > > > > > our streaming & batch ETL, and display the `override` information
> > on
> > > > the
> > > > > > UI.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> > > > wrote:
> > > > > >
> > > > > > > Hi Shammon,
> > > > > > >
> > > > > > > Thanks for starting this FLIP! Data lineage is a very important
> > > > topic,
> > > > > > > which has been missing for a long time in Flink. I have some
> > > > questions
> > > > > > > about the FLIP.
> > > > > > >
> > > > > > > About events and listeners:
> > > > > > >
> > > > > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > > > > JobCreatedEvent.
> > > > > > > This is an internal class in flink-runtime, and I think the
> > correct
> > > > API
> > > > > > > should be RuntimeExecutionMode. Furthermore, I think the
> boundary
> > > of
> > > > > > batch
> > > > > > > and streaming becomes much more vague as Flink is evolving
> > towards
> > > > > > > batch-streaming unification, so I’m concerned about exposing
> > > JobType
> > > > > as a
> > > > > > > public API. Is there any specific use case to expose the batch
> /
> > > > > > streaming
> > > > > > > info to listeners or meta services?
> > > > > > >
> > > > > > > 2. Currently JobCreatedEvent gives a Configuration, which is
> > quite
> > > > > > > ambiguous. To be honest the configuration is quite a mess in
> > Flink,
> > > > so
> > > > > > > maybe it’s better to be more specific here to tell users what
> > > > > information
> > > > > > > they could expect to see here, instead of just a “job
> > > configuration”
> > > > as
> > > > > > > described in JavaDoc.
> > > > > > >
> > > > > > > 3. JobStatusChangedListenerFactory.Context provides an IO
> > > executor. I
> > > > > > think
> > > > > > > more information should be provided here, such as which thread
> > > model
> > > > > this
> > > > > > > executor could promise, and whether the user should care about
> > > > > > concurrency
> > > > > > > issues. Otherwise I prefer not to give such an utility that no
> > one
> > > > > dares
> > > > > > to
> > > > > > > use safely, and leave it to users to choose their
> implementation.
> > > > > > >
> > > > > > > About lineage:
> > > > > > >
> > > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> > > list
> > > > of
> > > > > > > LineageEntity. Could you elaborate more on this class? From my
> > > naive
> > > > > > > imagination, the lineage is shaped as a DAG, where vertices are
> > > > sources
> > > > > > and
> > > > > > > sinks (LineageEntity) and edges are connections between them
> > > > > > > (LineageRelation), so it is a bit confusing for a name mixing
> > these
> > > > two
> > > > > > > concepts.
> > > > > > >
> > > > > > > 5. I can’t find the definition of CatalogContext in the current
> > > code
> > > > > base
> > > > > > > and Flink, which appears in the TableLineageEntity.
> > > > > > >
> > > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode
> and a
> > > > > boolean
> > > > > > > (the “override” is quite confusing). I’m wondering if these are
> > > > > necessary
> > > > > > > for meta services, as they are actually concepts defined in the
> > > > runtime
> > > > > > > level of Flink Table / SQL.
> > > > > > >
> > > > > > > Best,
> > > > > > > Qingsheng
> > > > > > >
> > > > > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi devs,
> > > > > > > >
> > > > > > > > Is there any comment or feedback for this FLIP? Hope to hear
> > from
> > > > > you,
> > > > > > > > thanks
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Shammon FY
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi devs,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion on FLIP-314: Support
> > > > Customized
> > > > > > Job
> > > > > > > > > Lineage Listener[1] which is the next stage of FLIP-294
> [2].
> > > > Flink
> > > > > > > > > streaming and batch jobs create lineage dependency between
> > > source
> > > > > and
> > > > > > > > sink,
> > > > > > > > > users can manage their data and jobs according to this
> > lineage
> > > > > > > > information.
> > > > > > > > > For example, when there is a delay in Flink ETL or data,
> > users
> > > > can
> > > > > > > easily
> > > > > > > > > trace the problematic jobs and affected data. On the other
> > > hand,
> > > > > when
> > > > > > > > users
> > > > > > > > > need to correct data or debug, they can perform operations
> > > based
> > > > on
> > > > > > > > lineage
> > > > > > > > > too.
> > > > > > > > >
> > > > > > > > > In FLIP-314 we want to introduce lineage related interfaces
> > for
> > > > > Flink
> > > > > > > and
> > > > > > > > > users can create customized job status listeners. When job
> > > status
> > > > > > > > changes,
> > > > > > > > > users can get job status and information to add, update or
> > > delete
> > > > > > > > lineage.
> > > > > > > > >
> > > > > > > > > Looking forward to your feedback, thanks.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Shammon FY
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to