[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2024-01-31 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813003#comment-17813003
 ] 

Fang Yong commented on FLINK-31275:
---

Hi [~mobuchowski] [~ZhenqiuHuang], I feel that there are many details that need 
to be discussed regarding the interfaces. Would it be convenient for you to 
send an email to me (zjur...@gmail.com) so that we can schedule an offline 
meeting to discuss it clearly first, and then initiate a new discussion in the 
dev mailing list. Hope to hear from you, thanks :)

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2024-01-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807671#comment-17807671
 ] 

Martijn Visser commented on FLINK-31275:


Looking at the discussions happening here, I think this should be brought back 
to the Dev mailing list instead of discussing it under Jira 

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2024-01-16 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807486#comment-17807486
 ] 

Zhenqiu Huang commented on FLINK-31275:
---

Hi Everyone, I also want to bump the thread due to the internal needs. I feel 
open lineage community gives very good suggestion to define an intermediate 
representation (Dataset) about the metadata of a since/sink. Also LineageVertex 
could definitely have multiple dataset, for example Hybrid source users who 
read from Kafka first then switch to iceberg. Given this, I feel the config 
should be in dataset rather than LineageVertex. On the other hand, we want to 
make the column lineage possible, so having the query in the dataset will be 
reason for lineage provide to analysis the column relationship. For 
input/output schema, we may put it into a facet. It could be optional depends 
on the connector implementation. How do you think [~zjureel] [~mobuchowski]?


public interface LineageVertex {
/* List of input (for source) or output (for sink) datasets interacted with 
by the connector */
List datasets; 
} 

public interface Dataset {
/* Name for this particular dataset. */
String name;
/* Unique name for this dataset's datasource. */
String namespace; 
/* Query used to generate the dataset If there is */
String query;
/* Facets for the lineage vertex to describe the particular information of 
dataset. */ 
Map facets; 
} 

Facet type could be SchemaFacet and ConfigFacet. 



> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-12-21 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799357#comment-17799357
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

Hey [~zjureel], is there any news regarding this feature?

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-12-07 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794165#comment-17794165
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

Hey [~zjureel]. Had additional idea - for SQL jobs, would be nice to attach the 
actual text representation of the query. This would allow data catalogs to 
accurately show what query created the table. 

Additionally, is there any news about progress? We are excited for this 
feature, and are willing to help with testing and implementation.

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-27 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17790083#comment-17790083
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

>Why a `LineageVertex` have multiple inputs or outputs? We hope that 
>'LineageVertex' describes a single source or sink, rather than multiple.

I have two good counterexamples, for read and write, when one source or sink 
describes more than one datasets:
 * KafkaSource can read from multiple topics, or even wildcard pattern.
 * Another case is where one company used JDBC connector sink, and they had 
very large amount of destination tables (1000s), some of them with rather small 
amounts of data. The database would not work with one-connection-per-table 
model, so I had a fork of JDBC connector that could dynamically determine to 
which table the connector should write the data. I tried to contribute that but 
there was no interest. [https://github.com/apache/flink/pull/15102/files]

Flink is really flexible when it comes to structure of the job, which should be 
reflected in the API.

>We introduce `LineageEdge` in this FLIP to describe the relation between 
>sources and sinks instead of add `input` or `output` in `LineageVertex`.

I think those are two things are separate, as different datasets in one source 
can have different output sinks.

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-26 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17789890#comment-17789890
 ] 

Fang Yong commented on FLINK-31275:
---

[~mobuchowski] Sorry for the late reply.

What does the `DataSet` in `LineageVertex` use for? Why a `LineageVertex` have 
multiple inputs or outputs? We hope that 'LineageVertex' describes a single 
source or sink, rather than multiple. So I prefer to add `Map` 
to `LineageVertex` directly to describe the particular information. We 
introduce `LineageEdge` in this FLIP to describe the relation between sources 
and sinks instead of add `input` or `output` in `LineageVertex`.



> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-16 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786866#comment-17786866
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

[~zjureel] I think this is a step in the right direction.

For `StreamingFacet`, if the intention is to attach it to connectors 
interfacing with services like Kafka, Kinesis, Pulsar, PubSub then I believe it 
needs ability to describe multiple topics - since, at least for Kafka, you can 
read from either list of topics or wildcard pattern. The same for `SchemaFacet` 
- the topics can have different schema.

The way that OpenLineage deals with this - and Flink could do too - is 
associating those with concept of `Dataset`, rather than job itself. So, we'd 
have roughly
public interface LineageVertex {
/* List of input (for source) or output (for sink) datasets interacted with 
by the connector */
List datasets;/* Facets for the lineage vertex to describe the 
general information of source/sink. */Map facets;/* 
Config for the lineage vertex contains all the options for the connector. */
Map config;
}
 
public interface Dataset {
/* Name for this particular dataset. */
String name;
/* Unique name for this dataset's datasource. */
String namespace; 
/* Facets for the lineage vertex to describe the particular information of 
dataset. */Map facets;
}

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-14 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785782#comment-17785782
 ] 

Fang Yong commented on FLINK-31275:
---

[~mobuchowski] Sorry for the late reply, I think I get your point now and thank 
you for your valuable suggestions.

Based on our discussion, I think we can add facets in the `LineageVertex` as 
follows.

{code:java}
public interface LineageVertex {
   /* Facets for the lineage vertex to describe the information of source/sink. 
*/
Map facets;
/* Config for the lineage vertex contains all the options for the 
connector. */
Map config;
}
{code}

We can implement some common facets such as `ConnectorFacet`, `StreamingFacet`, 
`SchemaFacet`. We can add new `Facet` implementations as needed in the future.
For `TableLineageVertex` we can create facets from config in table ddl which 
will make the table options meaningful.
For datastream jobs we can create facts from data source and sink.

{code:java}
/** Connector facet has name and address for the connector, for example, jdbc 
and url for jdbc connector, kafka and server for kafka connector. */
public interface ConnectorFacet extends Facet {
String name();
String address();
}

/** Fact for streaming connector. */
public interface StreamingFacet {
String topic();
String group();
String format();
String start();
}

/** Fact for schema in connector. */
public interface SchemaFacet {
Map fields();
}
{code}

What do you think of this? Thanks



> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-09 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784486#comment-17784486
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

> So, I think our current point of divergence is which level of abstraction the 
> user needs to perceive.

I would differenciate between "end user" - who just writes job code, whether in 
DataStream or SQL, listener developer and connector developer. So ideally for 
me, abstraction level for end user who just works on a job-level code is that 
they would not need to do anything besides configuring the listener and 
enjoying the lineage graph in their preferred lineage backend.

> In the current FLIP, for DataStream jobs, listener developers need to 
> identify whether the `LineageVertex` is a `KafkaSourceLineageVertex` or a 
> `JdbcLineageVertex`. You mean we need to define another layer, such as the 
> `DataSetConfig` interface, and then the listener developer can identify 
> whether it is a `KafkaDataSetConfig` or a `JdbcDataSetConfig`, right?

For listener developer, I would argue that for transmitting basic lineage - 
data source, dataset names, possibly schema and column-level lineage - 
developer should be able to get this data utilizing basic interface buildin for 
this FLIP. So, basic support would mean just recognizing `DataSetConfig` (or 
having this data in basic LineageVertex) - without any classes that strongly 
tie listener to some particular connectors. This is especially important for 
authors of generic (not only in-house) listeners, like OpenLineageListener or 
perhaps DatahubListener that would like to support lineage returned from custom 
connectors.

For connector developer, they should implement this basic interface, and then 
all implementation of listeners would be able to understand gathered lineage - 
without even knowledge of this connector.

Basically, instead of N x M problem where there are N connectors and M 
listeners and every listener has to have specific code for each connector, we 
should have single intermediate interface, so we'd save everyone's time.

Then, it would be best if there was a standard way for connectors to extend the 
returned data structure. This could be inheritance, as the FLIP suggests, but I 
think better, but maybe less type safe way would be to provide something like 
Map where Facet is just a self-contained, atomic piece of 
extension metadata - things like information about output storage system, 
connector name and version, or perhaps some metrics about job execution - it's 
up for connector developer. I believe it's better, because lack of knowledge of 
particular `LineageVertex` subtype doesn't prevent you from getting lineage.

So yes, good comparison is proposed `TableLineageVertex` - I would just extend 
this concept to DataStream jobs and provide (optionally?) more metadata, with 
slightly different interface for extension.

 

I want to add that despite some disagreements on this interface, I respect the 
work you've done on this topic [~zjureel] and I believe even without 
acknowledging my points, the interface is a big step forward for better 
observability of Flink jobs.

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-06 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783147#comment-17783147
 ] 

Fang Yong commented on FLINK-31275:
---

Hi [~mobuchowski] Thanks for your reply.

I think our ideas are consistent, just at different levels of abstraction. The 
interface `LineageVertex` is the top interface for connectors in Flink, and we 
implement `TableLineageVertex` for tables, because a Table is a complete 
definition, including the database, schema, etc. We put the options in the 
`with` into a map, which is consistent with the definition and usage habits of 
SQL in Flink.

For the official Flink connectors, we will implement the `LineageVertex` for 
`Source` and `InputFormat` for `DataStream` jobs, such as `KafkaSourceLineage`, 
etc, as we mentioned in FLINK: `We will implement LineageVertexProvider  for 
the builtin source and sink such as KafkaSource , HiveSource , 
FlinkKafkaProducerBase  and etc.`.
End-users don't need to implement them. In order to be consistent with the 
usage habits of tables, we will put the corresponding information into a map 
when implementing it, and users can obtain it.

So, I think our current point of divergence is which level of abstraction the 
user needs to perceive. In the current FLIP, for DataStream jobs, listener 
developers need to identify whether the `LineageVertex` is a 
`KafkaSourceLineageVertex` or a `JdbcLineageVertex`. You mean we need to define 
another layer, such as the `DataSetConfig` interface, and then the listener 
developer can identify whether it is a `KafkaDataSetConfig` or a 
`JdbcDataSetConfig`, right?

Our current use of `LineageVertexis` mainly to consider flexibility and 
facilitate the addition of returned information in the lineage vertex of the 
`DataStream`, such as the vector type data source information mentioned in the 
FLIP example. At the same time, connector maintainers can also easily provide 
lineage vertex for customized connectors. If the connector is in table format, 
we prefer that users directly provide a TableLineageVertex instance.





> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-03 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782529#comment-17782529
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

[~zjureel] 
Generally the idea is that the interface should allow for connecting the 
listener and particular connector in a scenario where they are written by 
totally separate people and don't even know of each other. For example, I 
should be able to write an OpenLineage listener (or Datahub, Atlan...) and it 
should work - and report lineage - from some proprietary, internal connector 
that implements FLIP-314 interface.

The way of doing it in the currently proposed and implemented interface 
requires either
 # Listener knowing the LineageVertex subclasses of all possible connectors, or
 # Listener providing it's own specialized subclass that the connectors would 
implement.

The problem with first idea is that it's not possible to provide extensibility 
mechanism. If I have to enumerate all the supported connectors, I won't be able 
to cover some proprietary connector that that I don't know the code of - and 
the connector author won't be able to do it either.

The problem with second idea is that it's pretty much duplicating effort here. 
It's hard to get consensus around particular interface, and it's even harder to 
have implementation of it in open source. Even if this was done, it pretty much 
forces all the people to use this listener - which I believe is the opposite of 
the goal of the open interface as FLIP-314.

> 2. For customized source/sink in datastream jobs, we can get source and slink 
> `LineageVertex` implementations from `LineageVertexProvider`. When users 
> implement customized lineage vertex and edge, they need to update them when 
> their connectors are updated.

> IIUC, do you mean we should give an implementation of `LineageVertex` for 
> datastream jobs and users can provide source/sink information there just like 
> `TableLinageVertex` in sql jobs? Then listeners can use the datastream 
> lineage vertex which is similar with table lineage vertex?

In a way, yes. While datastream jobs are flexible, sources and sinks generally 
read from and write to known data systems, and those connectors know them. On 
the other hand, I think best possible interface wouldn't have 
`TableLinageVertex` or `DatasetLineageVertex`, just one unified interface that 
would allow connectors themselves to describe the list of datasets read from 
and written to, like the one I've posted in previous comment.

> Due to the flexibility of the source and sink in `DataStream`, we think it's 
> hard to cover all of them, so we just provide `LineageVertex` and 
> `LineageVertexProvider` for them. So we left this flexibility to users and 
> listeners. If a custom connector is a table in `DataStream` job, users can 
> return `TableLineageVertex` in the `LineageVertexProvider`.

My idea is that connector authors provide those - not end users. End users 
providing those means the job is duplicated - while the Kafka connector always 
knows it's reading from some topics, or JDBC connector knows it's writing to a 
particular table in particular database.

However, end users should have the ability to enrich this data with some 
particular information.

> I feel the most painful thing is to infer the schema of source/source for 
> lineage perspective. If the schema info can be provided in Flink connector, 
> the integration in open lineage or even other framework will be clean, 
> concise. 

Agreed - schema is very important next to actual dataset identifier. And very 
important for any possible future column level lineage work.

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-03 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782461#comment-17782461
 ] 

Fang Yong commented on FLINK-31275:
---

[~ZhenqiuHuang] Sounds nice to me, I think we can consider this together. If 
there are some common connectors and relevant schema for the `DataStream` jobs, 
we can define these lineage vertex in flink, such as `ColumnsLineageVertex` 
which has multiple columns with different data type?

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-11-01 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781841#comment-17781841
 ] 

Zhenqiu Huang commented on FLINK-31275:
---

I am recently working Flink with open lineage integration. In our org, 
customers mainly use the data stream api for icerberg, kafka, cassandra. As 
table API is  not even used, so implement these TableLineageVertex is probably 
not the best way.

I feel the most painful thing is to infer the schema of source/source for 
lineage perspective. If the schema info can be provided in Flink connector, the 
integration in open lineage or even other framework will be clean, concise. 

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-31 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781613#comment-17781613
 ] 

Fang Yong commented on FLINK-31275:
---

Hi [~mobuchowski], thanks for your comments. In the currently FLIP the 
`LineageVertex` is the top interface for vertexes in lineage graph, it will be 
used in flink sql jobs and datastream jobs. 

1. For table connectors in sql jobs, there will be `TableLineageVertex` which 
is generated from flink catalog based table and provides catalog context, table 
schema for specified connector. The table lineage vertex and edge 
implementations will be created from dynamic tables for connectors in flink, 
and they will be updated when the connectors are updated.

2. For customized source/sink in datastream jobs, we can get source and slink 
`LineageVertex` implementations from `LineageVertexProvider`. When users 
implement customized lineage vertex and edge, they need to update them when 
their connectors are updated.

IIUC, do you mean we should give an implementation of `LineageVertex` for 
datastream jobs and users can provide source/sink information there just like 
`TableLinageVertex` in sql jobs? Then listeners can use the datastream lineage 
vertex which is similar with table lineage vertex? 

Due to the flexibility of the source and sink in `DataStream`, we think it's 
hard to cover all of them, so we just provide `LineageVertex` and 
`LineageVertexProvider` for them. So we left this flexibility to users and 
listeners. If a custom connector is a table in `DataStream` job, users can 
return `TableLineageVertex` in the `LineageVertexProvider`.

And for the following `LineageVertex`
```
public interface LineageVertex {
/* Config for the lineage vertex contains all the options for the 
connector. */
Map config();
/* List of datasets that are consumed by this job */
List inputs();
/* List of datasets that are produced by this job */
List outputs();
}
```
We tend to provide independent edge descriptions of connectors in `LineageEdge` 
for lineage graph instead of adding dataset in `LineageVertex`. The 
`LineageVertex` here is the `DataSet` you mentioned.

WDYT? Hope to hear from you, thanks






> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-31 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781471#comment-17781471
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

[~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. 
But I'll still voice concern here from {{[OpenLineage|https://openlineage.io/] 
}}as we'd want to implement this interface.

It seems like the intented way for LineageVertex interface is to just provide 
config context to particular nodes:

```
{{public}} {{interface}} {{LineageVertex {}}
{{}}{{/* Config for the lineage vertex contains all the options for the 
connector. */}}
{{}}{{Map config();}}
{{}}}
```
and then in particular case, when the listener understand particular 
implementation, provide more information:

```

{{// Create kafka source class with lineage vertex}}
{{public}} {{class}} {{KafkaVectorSource }}{{extends}} {{KafkaSource 
}}{{implements}} {{LineageVertexProvider {}}
{{}}{{int}} {{capacity;}}
{{}}{{String valueType;}}
 
{{}}{{public}} {{LineageVertex LineageVertex() {}}
{{}}{{return}} {{new}} {{KafkaVectorLineageVertex(capacity, 
valueType);}}
{{}}{{}}}
{{}}}
{{```}}
 
I think this is problematic because it strongly couples the listener to 
particular vertex implementation.
If you want to get list of datasets that are read by particular Flink job, 
you'll have to understand where the config is coming from and it's structure. 
Additionally, sometimes config is not everything we need to get lineage - for 
example, for Kafka connector we could get regex pattern used for reading that 
we'd need to resolve ourselves.
Or, if the connector subclasses `LineageVector` then another option is to get 
additional information from the subclass - but still, the connector has to 
understand it.
Another problem is that the configuration structure for particular connector 
can have breaking changes between version - so we're tied not only to 
connector, but also particular version of it.

But if we pushed the responsibility of understanting the datasets that 
particular vertex of a graph produces to the connector itself, we'd not have 
this problem.
First, the connector understands where it's reading from and writing to - so 
providing that information is easy for it. 
Second, the versioning problem does not exist - because the connector can 
update the code responsible for providing dataset information at same PR that 
breaks it, which will be transparent for the listener.


I would imagine the interface to be just something like this:


```
{{public}} {{interface}} {{LineageVertex {}}
{{}}{{/* Config for the lineage vertex contains all the options for the 
connector. */}}
{{}}{{Map config();}}
{{    /* List of datasets that are consumed by this job */}}{{}}
{{}}{{List inputs();}}
{{    /* List of datasets that are produced by this job */}}{{}}
{{}}{{List outputs();}}
{{}}}
```
 
What dataset is in this case is debatable: from OL perspective it would be best 
if this would be something similar to 
[https://openlineage.io/apidocs/javadoc/io/openlineage/client/openlineage.dataset]
 - get name (ex. table name) and namespace (ex. standarized database 
identifier). It also provides extensible list of facets that represent 
additional information about the dataset that the particular connection wants 
to expose together with just dataset identifier - ex. something that represents 
table schema or side of the dataset. It could be something Flink - specific, 
but should allow particular connections to expose the additional information.
 

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-31 Thread Maciej Obuchowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781458#comment-17781458
 ] 

Maciej Obuchowski commented on FLINK-31275:
---

[~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. 
But I'll still voice concern here:

It seems like the intented way for LineageVertex interface is to just provide 
config context:


```

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-23 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778890#comment-17778890
 ] 

Fang Yong commented on FLINK-31275:
---

[~ZhenqiuHuang] Sorry for the late reply, and please feel free to comment the 
issues if you have any idea or would like to take it, thanks

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-15 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775527#comment-17775527
 ] 

Zhenqiu Huang commented on FLINK-31275:
---

[~zjureel]
We have similar requirements. To accelerate the development, I can help on some 
Jira tickets.

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-06 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772734#comment-17772734
 ] 

Fang Yong commented on FLINK-31275:
---

[~ZhenqiuHuang] Thanks for your attention, FLIP-314 is on voting 
https://lists.apache.org/thread/dxdqjc0dd8rf1vbdg755zo1n2zj1tj8d

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> Currently flink generates job id in `JobGraph` which can identify a job. On 
> the other hand, flink create source/sink table in planner. We need to create 
> relations between source and sink tables for the job with an identifier id



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-10-05 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772399#comment-17772399
 ] 

Zhenqiu Huang commented on FLINK-31275:
---

[~jark]
This JIRA is mentioned in FLIP-314. Is there anyone who is actually working on 
it? 

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> Currently flink generates job id in `JobGraph` which can identify a job. On 
> the other hand, flink create source/sink table in planner. We need to create 
> relations between source and sink tables for the job with an identifier id



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746936#comment-17746936
 ] 

Konstantin Knauf commented on FLINK-31275:
--

I will mark this as "Not finished" for Flink 1.18 and remove the fixVersion 
from the ticket as the feature freeze has passed. Thanks, Konstantin (one of 
the release managers for Flink 1.18)

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> Currently flink generates job id in `JobGraph` which can identify a job. On 
> the other hand, flink create source/sink table in planner. We need to create 
> relations between source and sink tables for the job with an identifier id



--
This message was sent by Atlassian Jira
(v8.20.10#820010)