[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813003#comment-17813003 ] Fang Yong commented on FLINK-31275: --- Hi [~mobuchowski] [~ZhenqiuHuang], I feel that there are many details that need to be discussed regarding the interfaces. Would it be convenient for you to send an email to me (zjur...@gmail.com) so that we can schedule an offline meeting to discuss it clearly first, and then initiate a new discussion in the dev mailing list. Hope to hear from you, thanks :) > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807671#comment-17807671 ] Martijn Visser commented on FLINK-31275: Looking at the discussions happening here, I think this should be brought back to the Dev mailing list instead of discussing it under Jira > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807486#comment-17807486 ] Zhenqiu Huang commented on FLINK-31275: --- Hi Everyone, I also want to bump the thread due to the internal needs. I feel open lineage community gives very good suggestion to define an intermediate representation (Dataset) about the metadata of a since/sink. Also LineageVertex could definitely have multiple dataset, for example Hybrid source users who read from Kafka first then switch to iceberg. Given this, I feel the config should be in dataset rather than LineageVertex. On the other hand, we want to make the column lineage possible, so having the query in the dataset will be reason for lineage provide to analysis the column relationship. For input/output schema, we may put it into a facet. It could be optional depends on the connector implementation. How do you think [~zjureel] [~mobuchowski]? public interface LineageVertex { /* List of input (for source) or output (for sink) datasets interacted with by the connector */ List datasets; } public interface Dataset { /* Name for this particular dataset. */ String name; /* Unique name for this dataset's datasource. */ String namespace; /* Query used to generate the dataset If there is */ String query; /* Facets for the lineage vertex to describe the particular information of dataset. */ Map facets; } Facet type could be SchemaFacet and ConfigFacet. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799357#comment-17799357 ] Maciej Obuchowski commented on FLINK-31275: --- Hey [~zjureel], is there any news regarding this feature? > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794165#comment-17794165 ] Maciej Obuchowski commented on FLINK-31275: --- Hey [~zjureel]. Had additional idea - for SQL jobs, would be nice to attach the actual text representation of the query. This would allow data catalogs to accurately show what query created the table. Additionally, is there any news about progress? We are excited for this feature, and are willing to help with testing and implementation. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17790083#comment-17790083 ] Maciej Obuchowski commented on FLINK-31275: --- >Why a `LineageVertex` have multiple inputs or outputs? We hope that >'LineageVertex' describes a single source or sink, rather than multiple. I have two good counterexamples, for read and write, when one source or sink describes more than one datasets: * KafkaSource can read from multiple topics, or even wildcard pattern. * Another case is where one company used JDBC connector sink, and they had very large amount of destination tables (1000s), some of them with rather small amounts of data. The database would not work with one-connection-per-table model, so I had a fork of JDBC connector that could dynamically determine to which table the connector should write the data. I tried to contribute that but there was no interest. [https://github.com/apache/flink/pull/15102/files] Flink is really flexible when it comes to structure of the job, which should be reflected in the API. >We introduce `LineageEdge` in this FLIP to describe the relation between >sources and sinks instead of add `input` or `output` in `LineageVertex`. I think those are two things are separate, as different datasets in one source can have different output sinks. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17789890#comment-17789890 ] Fang Yong commented on FLINK-31275: --- [~mobuchowski] Sorry for the late reply. What does the `DataSet` in `LineageVertex` use for? Why a `LineageVertex` have multiple inputs or outputs? We hope that 'LineageVertex' describes a single source or sink, rather than multiple. So I prefer to add `Map` to `LineageVertex` directly to describe the particular information. We introduce `LineageEdge` in this FLIP to describe the relation between sources and sinks instead of add `input` or `output` in `LineageVertex`. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786866#comment-17786866 ] Maciej Obuchowski commented on FLINK-31275: --- [~zjureel] I think this is a step in the right direction. For `StreamingFacet`, if the intention is to attach it to connectors interfacing with services like Kafka, Kinesis, Pulsar, PubSub then I believe it needs ability to describe multiple topics - since, at least for Kafka, you can read from either list of topics or wildcard pattern. The same for `SchemaFacet` - the topics can have different schema. The way that OpenLineage deals with this - and Flink could do too - is associating those with concept of `Dataset`, rather than job itself. So, we'd have roughly public interface LineageVertex { /* List of input (for source) or output (for sink) datasets interacted with by the connector */ List datasets;/* Facets for the lineage vertex to describe the general information of source/sink. */Map facets;/* Config for the lineage vertex contains all the options for the connector. */ Map config; } public interface Dataset { /* Name for this particular dataset. */ String name; /* Unique name for this dataset's datasource. */ String namespace; /* Facets for the lineage vertex to describe the particular information of dataset. */Map facets; } > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785782#comment-17785782 ] Fang Yong commented on FLINK-31275: --- [~mobuchowski] Sorry for the late reply, I think I get your point now and thank you for your valuable suggestions. Based on our discussion, I think we can add facets in the `LineageVertex` as follows. {code:java} public interface LineageVertex { /* Facets for the lineage vertex to describe the information of source/sink. */ Map facets; /* Config for the lineage vertex contains all the options for the connector. */ Map config; } {code} We can implement some common facets such as `ConnectorFacet`, `StreamingFacet`, `SchemaFacet`. We can add new `Facet` implementations as needed in the future. For `TableLineageVertex` we can create facets from config in table ddl which will make the table options meaningful. For datastream jobs we can create facts from data source and sink. {code:java} /** Connector facet has name and address for the connector, for example, jdbc and url for jdbc connector, kafka and server for kafka connector. */ public interface ConnectorFacet extends Facet { String name(); String address(); } /** Fact for streaming connector. */ public interface StreamingFacet { String topic(); String group(); String format(); String start(); } /** Fact for schema in connector. */ public interface SchemaFacet { Map fields(); } {code} What do you think of this? Thanks > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784486#comment-17784486 ] Maciej Obuchowski commented on FLINK-31275: --- > So, I think our current point of divergence is which level of abstraction the > user needs to perceive. I would differenciate between "end user" - who just writes job code, whether in DataStream or SQL, listener developer and connector developer. So ideally for me, abstraction level for end user who just works on a job-level code is that they would not need to do anything besides configuring the listener and enjoying the lineage graph in their preferred lineage backend. > In the current FLIP, for DataStream jobs, listener developers need to > identify whether the `LineageVertex` is a `KafkaSourceLineageVertex` or a > `JdbcLineageVertex`. You mean we need to define another layer, such as the > `DataSetConfig` interface, and then the listener developer can identify > whether it is a `KafkaDataSetConfig` or a `JdbcDataSetConfig`, right? For listener developer, I would argue that for transmitting basic lineage - data source, dataset names, possibly schema and column-level lineage - developer should be able to get this data utilizing basic interface buildin for this FLIP. So, basic support would mean just recognizing `DataSetConfig` (or having this data in basic LineageVertex) - without any classes that strongly tie listener to some particular connectors. This is especially important for authors of generic (not only in-house) listeners, like OpenLineageListener or perhaps DatahubListener that would like to support lineage returned from custom connectors. For connector developer, they should implement this basic interface, and then all implementation of listeners would be able to understand gathered lineage - without even knowledge of this connector. Basically, instead of N x M problem where there are N connectors and M listeners and every listener has to have specific code for each connector, we should have single intermediate interface, so we'd save everyone's time. Then, it would be best if there was a standard way for connectors to extend the returned data structure. This could be inheritance, as the FLIP suggests, but I think better, but maybe less type safe way would be to provide something like Map where Facet is just a self-contained, atomic piece of extension metadata - things like information about output storage system, connector name and version, or perhaps some metrics about job execution - it's up for connector developer. I believe it's better, because lack of knowledge of particular `LineageVertex` subtype doesn't prevent you from getting lineage. So yes, good comparison is proposed `TableLineageVertex` - I would just extend this concept to DataStream jobs and provide (optionally?) more metadata, with slightly different interface for extension. I want to add that despite some disagreements on this interface, I respect the work you've done on this topic [~zjureel] and I believe even without acknowledging my points, the interface is a big step forward for better observability of Flink jobs. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783147#comment-17783147 ] Fang Yong commented on FLINK-31275: --- Hi [~mobuchowski] Thanks for your reply. I think our ideas are consistent, just at different levels of abstraction. The interface `LineageVertex` is the top interface for connectors in Flink, and we implement `TableLineageVertex` for tables, because a Table is a complete definition, including the database, schema, etc. We put the options in the `with` into a map, which is consistent with the definition and usage habits of SQL in Flink. For the official Flink connectors, we will implement the `LineageVertex` for `Source` and `InputFormat` for `DataStream` jobs, such as `KafkaSourceLineage`, etc, as we mentioned in FLINK: `We will implement LineageVertexProvider for the builtin source and sink such as KafkaSource , HiveSource , FlinkKafkaProducerBase and etc.`. End-users don't need to implement them. In order to be consistent with the usage habits of tables, we will put the corresponding information into a map when implementing it, and users can obtain it. So, I think our current point of divergence is which level of abstraction the user needs to perceive. In the current FLIP, for DataStream jobs, listener developers need to identify whether the `LineageVertex` is a `KafkaSourceLineageVertex` or a `JdbcLineageVertex`. You mean we need to define another layer, such as the `DataSetConfig` interface, and then the listener developer can identify whether it is a `KafkaDataSetConfig` or a `JdbcDataSetConfig`, right? Our current use of `LineageVertexis` mainly to consider flexibility and facilitate the addition of returned information in the lineage vertex of the `DataStream`, such as the vector type data source information mentioned in the FLIP example. At the same time, connector maintainers can also easily provide lineage vertex for customized connectors. If the connector is in table format, we prefer that users directly provide a TableLineageVertex instance. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782529#comment-17782529 ] Maciej Obuchowski commented on FLINK-31275: --- [~zjureel] Generally the idea is that the interface should allow for connecting the listener and particular connector in a scenario where they are written by totally separate people and don't even know of each other. For example, I should be able to write an OpenLineage listener (or Datahub, Atlan...) and it should work - and report lineage - from some proprietary, internal connector that implements FLIP-314 interface. The way of doing it in the currently proposed and implemented interface requires either # Listener knowing the LineageVertex subclasses of all possible connectors, or # Listener providing it's own specialized subclass that the connectors would implement. The problem with first idea is that it's not possible to provide extensibility mechanism. If I have to enumerate all the supported connectors, I won't be able to cover some proprietary connector that that I don't know the code of - and the connector author won't be able to do it either. The problem with second idea is that it's pretty much duplicating effort here. It's hard to get consensus around particular interface, and it's even harder to have implementation of it in open source. Even if this was done, it pretty much forces all the people to use this listener - which I believe is the opposite of the goal of the open interface as FLIP-314. > 2. For customized source/sink in datastream jobs, we can get source and slink > `LineageVertex` implementations from `LineageVertexProvider`. When users > implement customized lineage vertex and edge, they need to update them when > their connectors are updated. > IIUC, do you mean we should give an implementation of `LineageVertex` for > datastream jobs and users can provide source/sink information there just like > `TableLinageVertex` in sql jobs? Then listeners can use the datastream > lineage vertex which is similar with table lineage vertex? In a way, yes. While datastream jobs are flexible, sources and sinks generally read from and write to known data systems, and those connectors know them. On the other hand, I think best possible interface wouldn't have `TableLinageVertex` or `DatasetLineageVertex`, just one unified interface that would allow connectors themselves to describe the list of datasets read from and written to, like the one I've posted in previous comment. > Due to the flexibility of the source and sink in `DataStream`, we think it's > hard to cover all of them, so we just provide `LineageVertex` and > `LineageVertexProvider` for them. So we left this flexibility to users and > listeners. If a custom connector is a table in `DataStream` job, users can > return `TableLineageVertex` in the `LineageVertexProvider`. My idea is that connector authors provide those - not end users. End users providing those means the job is duplicated - while the Kafka connector always knows it's reading from some topics, or JDBC connector knows it's writing to a particular table in particular database. However, end users should have the ability to enrich this data with some particular information. > I feel the most painful thing is to infer the schema of source/source for > lineage perspective. If the schema info can be provided in Flink connector, > the integration in open lineage or even other framework will be clean, > concise. Agreed - schema is very important next to actual dataset identifier. And very important for any possible future column level lineage work. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782461#comment-17782461 ] Fang Yong commented on FLINK-31275: --- [~ZhenqiuHuang] Sounds nice to me, I think we can consider this together. If there are some common connectors and relevant schema for the `DataStream` jobs, we can define these lineage vertex in flink, such as `ColumnsLineageVertex` which has multiple columns with different data type? > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781841#comment-17781841 ] Zhenqiu Huang commented on FLINK-31275: --- I am recently working Flink with open lineage integration. In our org, customers mainly use the data stream api for icerberg, kafka, cassandra. As table API is not even used, so implement these TableLineageVertex is probably not the best way. I feel the most painful thing is to infer the schema of source/source for lineage perspective. If the schema info can be provided in Flink connector, the integration in open lineage or even other framework will be clean, concise. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781613#comment-17781613 ] Fang Yong commented on FLINK-31275: --- Hi [~mobuchowski], thanks for your comments. In the currently FLIP the `LineageVertex` is the top interface for vertexes in lineage graph, it will be used in flink sql jobs and datastream jobs. 1. For table connectors in sql jobs, there will be `TableLineageVertex` which is generated from flink catalog based table and provides catalog context, table schema for specified connector. The table lineage vertex and edge implementations will be created from dynamic tables for connectors in flink, and they will be updated when the connectors are updated. 2. For customized source/sink in datastream jobs, we can get source and slink `LineageVertex` implementations from `LineageVertexProvider`. When users implement customized lineage vertex and edge, they need to update them when their connectors are updated. IIUC, do you mean we should give an implementation of `LineageVertex` for datastream jobs and users can provide source/sink information there just like `TableLinageVertex` in sql jobs? Then listeners can use the datastream lineage vertex which is similar with table lineage vertex? Due to the flexibility of the source and sink in `DataStream`, we think it's hard to cover all of them, so we just provide `LineageVertex` and `LineageVertexProvider` for them. So we left this flexibility to users and listeners. If a custom connector is a table in `DataStream` job, users can return `TableLineageVertex` in the `LineageVertexProvider`. And for the following `LineageVertex` ``` public interface LineageVertex { /* Config for the lineage vertex contains all the options for the connector. */ Map config(); /* List of datasets that are consumed by this job */ List inputs(); /* List of datasets that are produced by this job */ List outputs(); } ``` We tend to provide independent edge descriptions of connectors in `LineageEdge` for lineage graph instead of adding dataset in `LineageVertex`. The `LineageVertex` here is the `DataSet` you mentioned. WDYT? Hope to hear from you, thanks > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781471#comment-17781471 ] Maciej Obuchowski commented on FLINK-31275: --- [~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. But I'll still voice concern here from {{[OpenLineage|https://openlineage.io/] }}as we'd want to implement this interface. It seems like the intented way for LineageVertex interface is to just provide config context to particular nodes: ``` {{public}} {{interface}} {{LineageVertex {}} {{}}{{/* Config for the lineage vertex contains all the options for the connector. */}} {{}}{{Map config();}} {{}}} ``` and then in particular case, when the listener understand particular implementation, provide more information: ``` {{// Create kafka source class with lineage vertex}} {{public}} {{class}} {{KafkaVectorSource }}{{extends}} {{KafkaSource }}{{implements}} {{LineageVertexProvider {}} {{}}{{int}} {{capacity;}} {{}}{{String valueType;}} {{}}{{public}} {{LineageVertex LineageVertex() {}} {{}}{{return}} {{new}} {{KafkaVectorLineageVertex(capacity, valueType);}} {{}}{{}}} {{}}} {{```}} I think this is problematic because it strongly couples the listener to particular vertex implementation. If you want to get list of datasets that are read by particular Flink job, you'll have to understand where the config is coming from and it's structure. Additionally, sometimes config is not everything we need to get lineage - for example, for Kafka connector we could get regex pattern used for reading that we'd need to resolve ourselves. Or, if the connector subclasses `LineageVector` then another option is to get additional information from the subclass - but still, the connector has to understand it. Another problem is that the configuration structure for particular connector can have breaking changes between version - so we're tied not only to connector, but also particular version of it. But if we pushed the responsibility of understanting the datasets that particular vertex of a graph produces to the connector itself, we'd not have this problem. First, the connector understands where it's reading from and writing to - so providing that information is easy for it. Second, the versioning problem does not exist - because the connector can update the code responsible for providing dataset information at same PR that breaks it, which will be transparent for the listener. I would imagine the interface to be just something like this: ``` {{public}} {{interface}} {{LineageVertex {}} {{}}{{/* Config for the lineage vertex contains all the options for the connector. */}} {{}}{{Map config();}} {{ /* List of datasets that are consumed by this job */}}{{}} {{}}{{List inputs();}} {{ /* List of datasets that are produced by this job */}}{{}} {{}}{{List outputs();}} {{}}} ``` What dataset is in this case is debatable: from OL perspective it would be best if this would be something similar to [https://openlineage.io/apidocs/javadoc/io/openlineage/client/openlineage.dataset] - get name (ex. table name) and namespace (ex. standarized database identifier). It also provides extensible list of facets that represent additional information about the dataset that the particular connection wants to expose together with just dataset identifier - ex. something that represents table schema or side of the dataset. It could be something Flink - specific, but should allow particular connections to expose the additional information. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781458#comment-17781458 ] Maciej Obuchowski commented on FLINK-31275: --- [~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. But I'll still voice concern here: It seems like the intented way for LineageVertex interface is to just provide config context: ``` > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778890#comment-17778890 ] Fang Yong commented on FLINK-31275: --- [~ZhenqiuHuang] Sorry for the late reply, and please feel free to comment the issues if you have any idea or would like to take it, thanks > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775527#comment-17775527 ] Zhenqiu Huang commented on FLINK-31275: --- [~zjureel] We have similar requirements. To accelerate the development, I can help on some Jira tickets. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772734#comment-17772734 ] Fang Yong commented on FLINK-31275: --- [~ZhenqiuHuang] Thanks for your attention, FLIP-314 is on voting https://lists.apache.org/thread/dxdqjc0dd8rf1vbdg755zo1n2zj1tj8d > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Priority: Major > > Currently flink generates job id in `JobGraph` which can identify a job. On > the other hand, flink create source/sink table in planner. We need to create > relations between source and sink tables for the job with an identifier id -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772399#comment-17772399 ] Zhenqiu Huang commented on FLINK-31275: --- [~jark] This JIRA is mentioned in FLIP-314. Is there anyone who is actually working on it? > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Priority: Major > > Currently flink generates job id in `JobGraph` which can identify a job. On > the other hand, flink create source/sink table in planner. We need to create > relations between source and sink tables for the job with an identifier id -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746936#comment-17746936 ] Konstantin Knauf commented on FLINK-31275: -- I will mark this as "Not finished" for Flink 1.18 and remove the fixVersion from the ticket as the feature freeze has passed. Thanks, Konstantin (one of the release managers for Flink 1.18) > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Priority: Major > > Currently flink generates job id in `JobGraph` which can identify a job. On > the other hand, flink create source/sink table in planner. We need to create > relations between source and sink tables for the job with an identifier id -- This message was sent by Atlassian Jira (v8.20.10#820010)