Thanks Shammon for driving this FLIP, I have some comments about the updated 
FLIP.


1. It confuses for me that LineageEdge interface contains multiple sources but 
one sink, the relation looks like a graph instead of an edge in topology.  

2. TableColumnSourceLineageVertex interface  is not clear for me as why it’s 
designed to return  multiple columns, is there the case 
ColumnAOfSrcT1 -> columnAofSinkT1, ColumnBOfSrcT1->  columnAofSinkT1 ? 
Minor: the table() method name could be improved as it returns a  
TableLineageVertex instead of Table.

3. SupportsLineageVertex for Datastream source and sink are not natural to me, 
the underling logic is source/sink can provide a linage information optionally, 
how about change it to LineageVertexProvider { LineageVertex 
getLineageVertex();} ?

4.I’m hesitate to introducing method setLineageVertex for DataStreamSource and 
DataStreamSink as: 
   (a) Lineage is different to parallelism of source/sink, it should be unique 
and certain for a given source/sink, I prefer to only expose one interface to 
users. 
   (b) The way user to build SourceLineageVertex VS the way user to implement a 
LineageVertexProvider, which cost is higher? 
   minor: "SourceLineageEdge LineageEdge;” looks like a typo ?

5.DataStreamSink#setLineageEdge is not clear for me now, a sink can hold all 
LineageEdges even these edges’ target is not the sink itself, right? It’s 
incorrect here.
I have an intuition that the lineages of all pipelines in a datastream 
application should belong to the StreamExecutionEnviroment , introducing 
setLineageEdge method for 
StreamExecutionEnviroment should be better than current proposal.

Best,
Leonard  


> On Jul 5, 2023, at 5:26 PM, Shammon FY <zjur...@gmail.com> wrote:
> 
> Hi Jing,
> 
> Thanks for your feedback.
> 
>> 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
> 
> The `sinkColumn()` will return `String` which is the column name in the
> sink connector. I found the name of `TableColumnLineageEntity` may
> cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
> In my mind the `TableColumnLineageRelation` represents the lineage for each
> sink column, each column may be computed from multiple sources and columns.
> I use `TableColumnSourceLineageEntity` to manage each source and its
> columns for the sink column, so `TableColumnLineageRelation` has a sink
> column name and `TableColumnSourceLineageEntity` list.
> 
>> 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
> 
> The lineage interface of `DataStream` is very flexible. We have added
> `setLineageEntity` to the source to limit and verify user behavior,
> ensuring that users have not added non-existent sources as lineage.
> 
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge?
> 
> We referred to `Atlas` for the name of lineage, it uses `Entity` and
> `Relation` to represent the lineage relationship and another metadata
> service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
> and `Relation` are nicer for lineage, what do you think of it?
> 
> Best,
> Shammon FY
> 
> 
> On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <j...@ververica.com.invalid> wrote:
> 
>> Hi Shammon,
>> 
>> Thanks for your proposal. After reading the FLIP, I'd like to ask
>> some questions to make sure we are on the same page. Thanks!
>> 
>> 1. TableColumnLineageRelation#sinkColumn() should return
>> TableColumnLineageEntity instead of String, right?
>> 
>> 2. Since LineageRelation already contains all information to build the
>> lineage between sources and sink, do we still need to set the LineageEntity
>> in the source?
>> 
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
>> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
>> which contains multiple LineageEdge? E.g. multiple sources join into one
>> sink, or, edges of columns from one or different tables, etc.
>> 
>> Best regards,
>> Jing
>> 
>> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zjur...@gmail.com> wrote:
>> 
>>> Hi yuxia and Yun,
>>> 
>>> Thanks for your input.
>>> 
>>> For yuxia:
>>>> 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
>>> 
>>> At present, we only need to notify the listener when a job goes to
>>> termination, but I think it makes sense to add generic `oldStatus` and
>>> `newStatus` in the listener and users can update the job state in their
>>> service as needed.
>>> 
>>>> 2: I'm really confused about the `config()` included in
>> `LineageEntity`,
>>> where is it from and what is it for ?
>>> 
>>> The `config` in `LineageEntity` is used for users to get options for
>> source
>>> and sink connectors. As the examples in the FLIP, users can add
>>> server/group/topic information in the config for kafka and create lineage
>>> entities for `DataStream` jobs, then the listeners can get this
>> information
>>> to identify the same connector in different jobs. Otherwise, the `config`
>>> in `TableLineageEntity` will be the same as `getOptions` in
>>> `CatalogBaseTable`.
>>> 
>>>> 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
>> is
>>> needed or not, since `TableSinkLineageEntity` contains
>>> `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
>>> changelogmode?
>>> 
>>> At present, we do not actually use the changelog mode. It can be deleted,
>>> and I have updated FLIP.
>>> 
>>>> Btw, since there're a lot interfaces proposed, I think it'll be better
>> to
>>> give an example about how to implement a listener in this FLIP to make us
>>> know better about the interfaces.
>>> 
>>> I have added the example in the FLIP and the related interfaces and
>>> examples are in branch [1].
>>> 
>>> For Yun:
>>>> I have one more question on the lookup-join dim tables, it seems this
>>> FLIP does not touch them, and will them become part of the
>>> List<LineageEntity> sources() or adding another interface?
>>> 
>>> You're right, currently lookup join dim tables were not considered in the
>>> 'proposed changed' section of this FLIP. But the interface for lineage is
>>> universal and we can give `TableLookupSourceLineageEntity` which
>> implements
>>> `TableSourceLineageEntity` in the future without modifying the public
>>> interface.
>>> 
>>>> By the way, if you want to focus on job lineage instead of data column
>>> lineage in this FLIP, why we must introduce so many column-lineage
>> related
>>> interface here?
>>> 
>>> The lineage information in SQL jobs includes table lineage and column
>>> lineage. Although SQL jobs currently do not support column lineage, we
>>> would like to support this in the next step. So we have comprehensively
>>> considered the table lineage and column lineage interfaces here, and
>>> defined these two interfaces together clearly
>>> 
>>> 
>>> [1]
>>> 
>>> 
>> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>>> 
>>> Best,
>>> Shammon FY
>>> 
>>> 
>>> On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote:
>>> 
>>>> Hi Shammon,
>>>> 
>>>> I like the idea in general and it will help to analysis the job
>> lineages
>>>> no matter FlinkSQL or Flink jar jobs in production environments.
>>>> 
>>>> For Qingsheng's concern, I'd like the name of JobType more than
>>>> RuntimeExecutionMode, as the latter one is not easy to understand for
>>> users.
>>>> 
>>>> I have one more question on the lookup-join dim tables, it seems this
>>> FLIP
>>>> does not touch them, and will them become part of the
>> List<LineageEntity>
>>>> sources()​ or adding another interface?
>>>> 
>>>> By the way, if you want to focus on job lineage instead of data column
>>>> lineage in this FLIP, why we must introduce so many column-lineage
>>> related
>>>> interface here?
>>>> 
>>>> 
>>>> Best
>>>> Yun Tang
>>>> ________________________________
>>>> From: Shammon FY <zjur...@gmail.com>
>>>> Sent: Sunday, June 25, 2023 16:13
>>>> To: dev@flink.apache.org <dev@flink.apache.org>
>>>> Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
>> Listener
>>>> 
>>>> Hi Qingsheng,
>>>> 
>>>> Thanks for your valuable feedback.
>>>> 
>>>>> 1. Is there any specific use case to expose the batch / streaming
>> info
>>> to
>>>> listeners or meta services?
>>>> 
>>>> I agree with you that Flink is evolving towards batch-streaming
>>>> unification, but the lifecycle of them is different. If a job
>> processes a
>>>> bound dataset, it will end after completing the data processing,
>>> otherwise,
>>>> it will run for a long time. In our scenario, we will regularly
>> schedule
>>>> some Flink jobs to process bound dataset and update some job
>> information
>>> to
>>>> the lineage information for the "batch" jobs such as scheduled
>> timestamp,
>>>> execution duration when jobs are finished, which is different from
>>>> "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
>>>> `existsUnboundedSource` in `StreamingGraph` and
>> `StreamingGraphGenerator`
>>>> to determine `JobType` and disjoin jobs. We can mark `JobType` as
>>>> `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
>>> do
>>>> you think of it?
>>>> 
>>>>> 2. it’s better to be more specific here to tell users what
>> information
>>>> they could expect to see here, instead of just a “job configuration” as
>>>> described in JavaDoc.
>>>> 
>>>> Thanks and I have updated the doc in FLIP.
>>>> 
>>>>> 3. About the IO executor in JobStatusChangedListenerFactory.Context.
>>>> 
>>>> I have updated the docs for io executor  in
>>>> `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
>>> and
>>>> executes submitted tasks in parallel. Users can submit tasks to the
>>>> executor which ensures that the submitted task can be executed before
>> the
>>>> job exits.
>>>> 
>>>>> 4. I don’t quite get the LineageRelationEntity, which is just a list
>> of
>>>> LineageEntity.
>>>> 
>>>> In the initial idea, the `LineageRelationEntity` is used for
>> `DataStream`
>>>> to set additional lineage information besides source. For example,
>> there
>>>> are table and column lineages in SQL jobs. When we build a `DataStream`
>>> job
>>>> with table source and sink, we can add table lineage in the following
>>>> method.
>>>> ```
>>>> public class DataStreamSink {
>>>>    public DataStreamSink setLineageSources(LineageEntity ... sources);
>>>> }
>>>> ```
>>>> But we can not set column lineage for the above sink, and for the sake
>> of
>>>> universality, we do not want to add a method similar to
>> `addLineageColumn
>>>> (...)` in `DataStreamSink`. So I put this information into
>>>> LineageRelationEntity so that SQL and DataStream jobs can be
>> consistent.
>>>> But as you mentioned, this approach does indeed lead to ambiguity and
>>>> complexity. So my current idea is to add the `setLineageRelation`
>> method
>>> in
>>>> `DataStreamSink` directly without `LineageRelationEntity`, I have
>> updated
>>>> the FLIP and please help to review it again, thanks.
>>>> 
>>>>> 5. I can’t find the definition of CatalogContext in the current code
>>> base
>>>> and Flink, which appears in the TableLineageEntity.
>>>> 
>>>> CatalogContext is defined in FLIP-294 and I have updated the FLIP
>>>> 
>>>>> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>>> boolean
>>>> (the “override” is quite confusing). I’m wondering if these are
>> necessary
>>>> for meta services, as they are actually concepts defined in the runtime
>>>> level of Flink Table / SQL.
>>>> 
>>>> The information in `TableSinkLineageEntity` such as `ModifyType`,
>>>> `ChangelogMode` and `override` are mainly used for verification and
>>>> display. For example, Flink currently supports `INSERT`/`DELETE` and
>>>> `UPDATE`, we only want to report and update lineage for `INSERT` jobs
>> in
>>>> our streaming & batch ETL, and display the `override` information on
>> the
>>>> UI.
>>>> 
>>>> 
>>>> Best,
>>>> Shammon FY
>>>> 
>>>> 
>>>> On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
>> wrote:
>>>> 
>>>>> Hi Shammon,
>>>>> 
>>>>> Thanks for starting this FLIP! Data lineage is a very important
>> topic,
>>>>> which has been missing for a long time in Flink. I have some
>> questions
>>>>> about the FLIP.
>>>>> 
>>>>> About events and listeners:
>>>>> 
>>>>> 1. I’m not sure if it is necessary to expose JobType to in
>>>> JobCreatedEvent.
>>>>> This is an internal class in flink-runtime, and I think the correct
>> API
>>>>> should be RuntimeExecutionMode. Furthermore, I think the boundary of
>>>> batch
>>>>> and streaming becomes much more vague as Flink is evolving towards
>>>>> batch-streaming unification, so I’m concerned about exposing JobType
>>> as a
>>>>> public API. Is there any specific use case to expose the batch /
>>>> streaming
>>>>> info to listeners or meta services?
>>>>> 
>>>>> 2. Currently JobCreatedEvent gives a Configuration, which is quite
>>>>> ambiguous. To be honest the configuration is quite a mess in Flink,
>> so
>>>>> maybe it’s better to be more specific here to tell users what
>>> information
>>>>> they could expect to see here, instead of just a “job configuration”
>> as
>>>>> described in JavaDoc.
>>>>> 
>>>>> 3. JobStatusChangedListenerFactory.Context provides an IO executor. I
>>>> think
>>>>> more information should be provided here, such as which thread model
>>> this
>>>>> executor could promise, and whether the user should care about
>>>> concurrency
>>>>> issues. Otherwise I prefer not to give such an utility that no one
>>> dares
>>>> to
>>>>> use safely, and leave it to users to choose their implementation.
>>>>> 
>>>>> About lineage:
>>>>> 
>>>>> 4. I don’t quite get the LineageRelationEntity, which is just a list
>> of
>>>>> LineageEntity. Could you elaborate more on this class? From my naive
>>>>> imagination, the lineage is shaped as a DAG, where vertices are
>> sources
>>>> and
>>>>> sinks (LineageEntity) and edges are connections between them
>>>>> (LineageRelation), so it is a bit confusing for a name mixing these
>> two
>>>>> concepts.
>>>>> 
>>>>> 5. I can’t find the definition of CatalogContext in the current code
>>> base
>>>>> and Flink, which appears in the TableLineageEntity.
>>>>> 
>>>>> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>>> boolean
>>>>> (the “override” is quite confusing). I’m wondering if these are
>>> necessary
>>>>> for meta services, as they are actually concepts defined in the
>> runtime
>>>>> level of Flink Table / SQL.
>>>>> 
>>>>> Best,
>>>>> Qingsheng
>>>>> 
>>>>> On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Hi devs,
>>>>>> 
>>>>>> Is there any comment or feedback for this FLIP? Hope to hear from
>>> you,
>>>>>> thanks
>>>>>> 
>>>>>> Best,
>>>>>> Shammon FY
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> Hi devs,
>>>>>>> 
>>>>>>> I would like to start a discussion on FLIP-314: Support
>> Customized
>>>> Job
>>>>>>> Lineage Listener[1] which is the next stage of FLIP-294 [2].
>> Flink
>>>>>>> streaming and batch jobs create lineage dependency between source
>>> and
>>>>>> sink,
>>>>>>> users can manage their data and jobs according to this lineage
>>>>>> information.
>>>>>>> For example, when there is a delay in Flink ETL or data, users
>> can
>>>>> easily
>>>>>>> trace the problematic jobs and affected data. On the other hand,
>>> when
>>>>>> users
>>>>>>> need to correct data or debug, they can perform operations based
>> on
>>>>>> lineage
>>>>>>> too.
>>>>>>> 
>>>>>>> In FLIP-314 we want to introduce lineage related interfaces for
>>> Flink
>>>>> and
>>>>>>> users can create customized job status listeners. When job status
>>>>>> changes,
>>>>>>> users can get job status and information to add, update or delete
>>>>>> lineage.
>>>>>>> 
>>>>>>> Looking forward to your feedback, thanks.
>>>>>>> 
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>>>>>>> [2]
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
>>>>>>> 
>>>>>>> Best,
>>>>>>> Shammon FY
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to