Hi Feng,

Thanks for your input.

>1. we can add a lineage interface like `supportReportLineage`

It's a so good idea and thanks very much. It can help users to report
lineage for existing connectors in DataStream jobs without any additional
operations. I will give this interface in the FLIP later and please help to
review it, thanks

>2. it is relatively easy to obtain column lineage through Calcite
MetaQuery API

It's helpful if Calcite already has some column lineage in meta, I think we
can discuss and give the proposal in the column lineage FLIP

Best,
Shammon FY



On Wednesday, June 28, 2023, Feng Jin <jinfeng1...@gmail.com> wrote:

> Hi Shammon
> Thank you for proposing this FLIP. I think the Flink Job lineage is a very
> useful feature.
> I have few question:
>
> 1. For DataStream Jobs, users need to set up lineage relationships when
> building DAGs for their custom sources and sinks.
> However, for some common connectors such as Kafka Connector and JDBC
> Connector, we can add a lineage interface like `supportReportLineage`, so
> that these connectors can implement it.
> This way, in the scenario of DataStream Jobs, lineages can be automatically
> reported. What do you think?
>
>
> 2. From the current design, it seems that we need to analyze column lineage
> through pipeline. As far as I know, it is relatively easy to obtain column
> lineage through Calcite MetaQuery API.
> Would you consider using this approach? Or do we need to implement another
> parsing process based on the pipeline?
> ```
> RelMetadataQuery metadataQuery = relNode.getCluster().getMetadataQuery();
> metadataQuery.getColumnOrigins(inputRel, i);
> ```
> Best,
> Feng
>
>
> On Sun, Jun 25, 2023 at 8:06 PM Shammon FY <zjur...@gmail.com> wrote:
>
> > Hi yuxia and Yun,
> >
> > Thanks for your input.
> >
> > For yuxia:
> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
> >
> > At present, we only need to notify the listener when a job goes to
> > termination, but I think it makes sense to add generic `oldStatus` and
> > `newStatus` in the listener and users can update the job state in their
> > service as needed.
> >
> > > 2: I'm really confused about the `config()` included in
> `LineageEntity`,
> > where is it from and what is it for ?
> >
> > The `config` in `LineageEntity` is used for users to get options for
> source
> > and sink connectors. As the examples in the FLIP, users can add
> > server/group/topic information in the config for kafka and create lineage
> > entities for `DataStream` jobs, then the listeners can get this
> information
> > to identify the same connector in different jobs. Otherwise, the `config`
> > in `TableLineageEntity` will be the same as `getOptions` in
> > `CatalogBaseTable`.
> >
> > > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
> is
> > needed or not, since `TableSinkLineageEntity` contains
> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > changelogmode?
> >
> > At present, we do not actually use the changelog mode. It can be deleted,
> > and I have updated FLIP.
> >
> > > Btw, since there're a lot interfaces proposed, I think it'll be better
> to
> > give an example about how to implement a listener in this FLIP to make us
> > know better about the interfaces.
> >
> > I have added the example in the FLIP and the related interfaces and
> > examples are in branch [1].
> >
> > For Yun:
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP does not touch them, and will them become part of the
> > List<LineageEntity> sources() or adding another interface?
> >
> > You're right, currently lookup join dim tables were not considered in the
> > 'proposed changed' section of this FLIP. But the interface for lineage is
> > universal and we can give `TableLookupSourceLineageEntity` which
> implements
> > `TableSourceLineageEntity` in the future without modifying the public
> > interface.
> >
> > > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> > The lineage information in SQL jobs includes table lineage and column
> > lineage. Although SQL jobs currently do not support column lineage, we
> > would like to support this in the next step. So we have comprehensively
> > considered the table lineage and column lineage interfaces here, and
> > defined these two interfaces together clearly
> >
> >
> > [1]
> >
> > https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b1
> 1e82c9187c
> >
> > Best,
> > Shammon FY
> >
> >
> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote:
> >
> > > Hi Shammon,
> > >
> > > I like the idea in general and it will help to analysis the job
> lineages
> > > no matter FlinkSQL or Flink jar jobs in production environments.
> > >
> > > For Qingsheng's concern, I'd like the name of JobType more than
> > > RuntimeExecutionMode, as the latter one is not easy to understand for
> > users.
> > >
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP
> > > does not touch them, and will them become part of the
> List<LineageEntity>
> > > sources()​ or adding another interface?
> > >
> > > By the way, if you want to focus on job lineage instead of data column
> > > lineage in this FLIP, why we must introduce so many column-lineage
> > related
> > > interface here?
> > >
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Shammon FY <zjur...@gmail.com>
> > > Sent: Sunday, June 25, 2023 16:13
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> Listener
> > >
> > > Hi Qingsheng,
> > >
> > > Thanks for your valuable feedback.
> > >
> > > > 1. Is there any specific use case to expose the batch / streaming
> info
> > to
> > > listeners or meta services?
> > >
> > > I agree with you that Flink is evolving towards batch-streaming
> > > unification, but the lifecycle of them is different. If a job
> processes a
> > > bound dataset, it will end after completing the data processing,
> > otherwise,
> > > it will run for a long time. In our scenario, we will regularly
> schedule
> > > some Flink jobs to process bound dataset and update some job
> information
> > to
> > > the lineage information for the "batch" jobs such as scheduled
> timestamp,
> > > execution duration when jobs are finished, which is different from
> > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > > `existsUnboundedSource` in `StreamingGraph` and
> `StreamingGraphGenerator`
> > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
> > do
> > > you think of it?
> > >
> > > > 2. it’s better to be more specific here to tell users what
> information
> > > they could expect to see here, instead of just a “job configuration” as
> > > described in JavaDoc.
> > >
> > > Thanks and I have updated the doc in FLIP.
> > >
> > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
> > >
> > > I have updated the docs for io executor  in
> > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
> > and
> > > executes submitted tasks in parallel. Users can submit tasks to the
> > > executor which ensures that the submitted task can be executed before
> the
> > > job exits.
> > >
> > > > 4. I don’t quite get the LineageRelationEntity, which is just a list
> of
> > > LineageEntity.
> > >
> > > In the initial idea, the `LineageRelationEntity` is used for
> `DataStream`
> > > to set additional lineage information besides source. For example,
> there
> > > are table and column lineages in SQL jobs. When we build a `DataStream`
> > job
> > > with table source and sink, we can add table lineage in the following
> > > method.
> > > ```
> > > public class DataStreamSink {
> > >     public DataStreamSink setLineageSources(LineageEntity ...
> sources);
> > > }
> > > ```
> > > But we can not set column lineage for the above sink, and for the sake
> of
> > > universality, we do not want to add a method similar to
> `addLineageColumn
> > > (...)` in `DataStreamSink`. So I put this information into
> > > LineageRelationEntity so that SQL and DataStream jobs can be
> consistent.
> > > But as you mentioned, this approach does indeed lead to ambiguity and
> > > complexity. So my current idea is to add the `setLineageRelation`
> method
> > in
> > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> updated
> > > the FLIP and please help to review it again, thanks.
> > >
> > > > 5. I can’t find the definition of CatalogContext in the current code
> > base
> > > and Flink, which appears in the TableLineageEntity.
> > >
> > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > >
> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > boolean
> > > (the “override” is quite confusing). I’m wondering if these are
> necessary
> > > for meta services, as they are actually concepts defined in the runtime
> > > level of Flink Table / SQL.
> > >
> > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > `ChangelogMode` and `override` are mainly used for verification and
> > > display. For example, Flink currently supports `INSERT`/`DELETE` and
> > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs
> in
> > > our streaming & batch ETL, and display the `override` information on
> the
> > > UI.
> > >
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for starting this FLIP! Data lineage is a very important
> topic,
> > > > which has been missing for a long time in Flink. I have some
> questions
> > > > about the FLIP.
> > > >
> > > > About events and listeners:
> > > >
> > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > JobCreatedEvent.
> > > > This is an internal class in flink-runtime, and I think the correct
> API
> > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
> > > batch
> > > > and streaming becomes much more vague as Flink is evolving towards
> > > > batch-streaming unification, so I’m concerned about exposing JobType
> > as a
> > > > public API. Is there any specific use case to expose the batch /
> > > streaming
> > > > info to listeners or meta services?
> > > >
> > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > > > ambiguous. To be honest the configuration is quite a mess in Flink,
> so
> > > > maybe it’s better to be more specific here to tell users what
> > information
> > > > they could expect to see here, instead of just a “job configuration”
> as
> > > > described in JavaDoc.
> > > >
> > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor.
> I
> > > think
> > > > more information should be provided here, such as which thread model
> > this
> > > > executor could promise, and whether the user should care about
> > > concurrency
> > > > issues. Otherwise I prefer not to give such an utility that no one
> > dares
> > > to
> > > > use safely, and leave it to users to choose their implementation.
> > > >
> > > > About lineage:
> > > >
> > > > 4. I don’t quite get the LineageRelationEntity, which is just a list
> of
> > > > LineageEntity. Could you elaborate more on this class? From my naive
> > > > imagination, the lineage is shaped as a DAG, where vertices are
> sources
> > > and
> > > > sinks (LineageEntity) and edges are connections between them
> > > > (LineageRelation), so it is a bit confusing for a name mixing these
> two
> > > > concepts.
> > > >
> > > > 5. I can’t find the definition of CatalogContext in the current code
> > base
> > > > and Flink, which appears in the TableLineageEntity.
> > > >
> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > boolean
> > > > (the “override” is quite confusing). I’m wondering if these are
> > necessary
> > > > for meta services, as they are actually concepts defined in the
> runtime
> > > > level of Flink Table / SQL.
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com>
> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > Is there any comment or feedback for this FLIP? Hope to hear from
> > you,
> > > > > thanks
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > I would like to start a discussion on FLIP-314: Support
> Customized
> > > Job
> > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
> Flink
> > > > > > streaming and batch jobs create lineage dependency between source
> > and
> > > > > sink,
> > > > > > users can manage their data and jobs according to this lineage
> > > > > information.
> > > > > > For example, when there is a delay in Flink ETL or data, users
> can
> > > > easily
> > > > > > trace the problematic jobs and affected data. On the other hand,
> > when
> > > > > users
> > > > > > need to correct data or debug, they can perform operations based
> on
> > > > > lineage
> > > > > > too.
> > > > > >
> > > > > > In FLIP-314 we want to introduce lineage related interfaces for
> > Flink
> > > > and
> > > > > > users can create customized job status listeners. When job status
> > > > > changes,
> > > > > > users can get job status and information to add, update or delete
> > > > > lineage.
> > > > > >
> > > > > > Looking forward to your feedback, thanks.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > [2]
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to