Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs merged PR #25012: URL: https://github.com/apache/flink/pull/25012 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs commented on PR #25012: URL: https://github.com/apache/flink/pull/25012#issuecomment-2212441328 Thanks @HuangZhenQiu , +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
flinkbot commented on PR #25012: URL: https://github.com/apache/flink/pull/25012#issuecomment-2205241298 ## CI report: * ac8b90883eb1bf69b8b0b8bc3611c346ab0f2fc6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu opened a new pull request, #25012: URL: https://github.com/apache/flink/pull/25012 ## What is the purpose of the change 1. Add Table Lineage Vertex into transformation in planner. The final LineageGraph is generated from transformation and put into StreamGraph. The lineage graph will be published to Lineage Listener in follow up PR. 2. Deprecated table source and sink are not considered as no enough info can be used for name and namespace for lineage dataset. ## Brief change log - add table lineage interface and default implementations - create lineage vertex and add them to transformation in the phase of physical plan to transformation conversion. ## Verifying this change 1. Add TableLineageGraphTest for both stream and batch. 2. Added LineageGraph verification in TransformationsTest for legacy sources. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs merged PR #24618: URL: https://github.com/apache/flink/pull/24618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2204890224 Thanks @HuangZhenQiu , +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644431010 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java: ## @@ -42,7 +42,7 @@ * @param The type of the elements that result from this {@code OneInputTransformation} */ @Internal -public class OneInputTransformation extends PhysicalTransformation { +public class OneInputTransformation extends TransformationWithLineage { Review Comment: Looks like LineageGraphUtils only handles Source/Sink transformations, why making OneInputTransformation extend TransformationWithLineage? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644429269 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ## @@ -175,15 +200,32 @@ protected Transformation translateToPlanInternal( provider.getClass().getSimpleName() + " is unsupported now."); } +LineageDataset tableLineageDataset = +TableLineageUtils.createTableLineageDataset( +tableSourceSpec.getContextResolvedTable(), lineageVertex); + +TableSourceLineageVertex sourceLineageVertex = +new TableSourceLineageVertexImpl( +Arrays.asList(tableLineageDataset), +provider.isBounded() +? Boundedness.BOUNDED +: Boundedness.CONTINUOUS_UNBOUNDED); + if (sourceParallelismConfigured) { -return applySourceTransformationWrapper( -sourceTransform, -planner.getFlinkContext().getClassLoader(), -outputTypeInfo, -config, -tableSource.getChangelogMode(), -sourceParallelism); +((TransformationWithLineage) sourceTransform) +.setLineageVertex(sourceLineageVertex); +Transformation sourceTransformationWrapper = +applySourceTransformationWrapper( +sourceTransform, +planner.getFlinkContext().getClassLoader(), +outputTypeInfo, +config, +tableSource.getChangelogMode(), +sourceParallelism); +return sourceTransformationWrapper; } else { +((TransformationWithLineage) sourceTransform) Review Comment: Currently, ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ## @@ -175,15 +200,32 @@ protected Transformation translateToPlanInternal( provider.getClass().getSimpleName() + " is unsupported now."); } +LineageDataset tableLineageDataset = +TableLineageUtils.createTableLineageDataset( +tableSourceSpec.getContextResolvedTable(), lineageVertex); + +TableSourceLineageVertex sourceLineageVertex = +new TableSourceLineageVertexImpl( +Arrays.asList(tableLineageDataset), +provider.isBounded() +? Boundedness.BOUNDED +: Boundedness.CONTINUOUS_UNBOUNDED); + if (sourceParallelismConfigured) { -return applySourceTransformationWrapper( -sourceTransform, -planner.getFlinkContext().getClassLoader(), -outputTypeInfo, -config, -tableSource.getChangelogMode(), -sourceParallelism); +((TransformationWithLineage) sourceTransform) +.setLineageVertex(sourceLineageVertex); +Transformation sourceTransformationWrapper = +applySourceTransformationWrapper( +sourceTransform, +planner.getFlinkContext().getClassLoader(), +outputTypeInfo, +config, +tableSource.getChangelogMode(), +sourceParallelism); +return sourceTransformationWrapper; } else { +((TransformationWithLineage) sourceTransform) Review Comment: Currently, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644427363 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableColumnLineageEdge.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.lineage.LineageDataset; + +import java.util.List; + +/** + * Column lineage from source table columns to each sink table column, one sink column may be + * aggregated by multiple tables and columns. + */ +@PublicEvolving +public interface TableColumnLineageEdge { Review Comment: Where do we actually build column lineage edges? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644422069 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ## @@ -175,15 +200,32 @@ protected Transformation translateToPlanInternal( provider.getClass().getSimpleName() + " is unsupported now."); } +LineageDataset tableLineageDataset = +TableLineageUtils.createTableLineageDataset( +tableSourceSpec.getContextResolvedTable(), lineageVertex); + +TableSourceLineageVertex sourceLineageVertex = +new TableSourceLineageVertexImpl( +Arrays.asList(tableLineageDataset), +provider.isBounded() +? Boundedness.BOUNDED +: Boundedness.CONTINUOUS_UNBOUNDED); + if (sourceParallelismConfigured) { -return applySourceTransformationWrapper( -sourceTransform, -planner.getFlinkContext().getClassLoader(), -outputTypeInfo, -config, -tableSource.getChangelogMode(), -sourceParallelism); +((TransformationWithLineage) sourceTransform) Review Comment: We can extract this line before L214 as it is used in both branches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644420673 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ## @@ -175,15 +200,32 @@ protected Transformation translateToPlanInternal( provider.getClass().getSimpleName() + " is unsupported now."); } +LineageDataset tableLineageDataset = +TableLineageUtils.createTableLineageDataset( +tableSourceSpec.getContextResolvedTable(), lineageVertex); + +TableSourceLineageVertex sourceLineageVertex = +new TableSourceLineageVertexImpl( +Arrays.asList(tableLineageDataset), +provider.isBounded() +? Boundedness.BOUNDED +: Boundedness.CONTINUOUS_UNBOUNDED); + if (sourceParallelismConfigured) { -return applySourceTransformationWrapper( -sourceTransform, -planner.getFlinkContext().getClassLoader(), -outputTypeInfo, -config, -tableSource.getChangelogMode(), -sourceParallelism); +((TransformationWithLineage) sourceTransform) +.setLineageVertex(sourceLineageVertex); +Transformation sourceTransformationWrapper = +applySourceTransformationWrapper( +sourceTransform, +planner.getFlinkContext().getClassLoader(), +outputTypeInfo, +config, +tableSource.getChangelogMode(), +sourceParallelism); +return sourceTransformationWrapper; } else { +((TransformationWithLineage) sourceTransform) Review Comment: The source transformation here may be arbitrary when using the TransformationScanProvider, and it may not extend TransformationWithLineage. We'll need to add instanceof test first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644417751 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java: ## @@ -209,15 +227,28 @@ protected Transformation createSinkTransformation( sinkTransform = applyRowKindSetter(sinkTransform, targetRowKind.get(), config); } -return (Transformation) -applySinkProvider( -sinkTransform, -streamExecEnv, -runtimeProvider, -rowtimeFieldIndex, -sinkParallelism, -config, -classLoader); +LineageDataset tableLineageDataset = +TableLineageUtils.createTableLineageDataset( +tableSinkSpec.getContextResolvedTable(), lineageVertexOpt); + +TableSinkLineageVertex sinkLineageVertex = +new TableSinkLineageVertexImpl( +Arrays.asList(tableLineageDataset), +TableLineageUtils.convert(inputChangelogMode)); + +Transformation transformation = +(Transformation) +applySinkProvider( +sinkTransform, +streamExecEnv, +runtimeProvider, +rowtimeFieldIndex, +sinkParallelism, +config, +classLoader); + +((TransformationWithLineage) transformation).setLineageVertex(sinkLineageVertex); Review Comment: The transformation created by `applySinkProvider` may be arbitrary when using the `TransformationSinkProvider`, and it may not extend `TransformationWithLineage` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644414070 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java: ## @@ -174,6 +180,18 @@ protected Transformation createSinkTransformation( inputParallelism)); } +Object outputObject = null; +if (runtimeProvider instanceof OutputFormatProvider) { Review Comment: It's a bit strange that we support `OutputFormatProvider` & `SinkFunctionProvider` here, but not support `SinkProvider`. Since we've decided to drop support for deprecated sinks, it suffices to only support `SinkV2Provider`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644395633 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageEdgeImpl.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Implementation of LineageEdge. */ +@Internal +public class LineageEdgeImpl implements LineageEdge { Review Comment: nit: given the naming of `DefaultLineageDataset` and `DefaultLineageGraph`, why not naming it `DefaultLineageEdge` for consistency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
X-czh commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1644393072 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java: ## @@ -42,7 +42,7 @@ * @param The type of the elements that result from this {@code OneInputTransformation} */ @Internal -public class OneInputTransformation extends PhysicalTransformation { +public class OneInputTransformation extends TransformationWithLineage { Review Comment: Looks like `LineageGraphUtils` only handles Source/Sink transformations, why making `OneInputTransformation` extend `TransformationWithLineage`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1613290123 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java: ## @@ -20,13 +20,12 @@ package org.apache.flink.streaming.api.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.List; /** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. + * Job lineage graph that users can get sources, sinks and relationships from lineage and manage the Review Comment: > Thanks David for your comments. Yes, the documentation will be added after adding the job lineage listener which is more user facing. It is planned in this jira https://issues.apache.org/jira/browse/FLINK-33212. This PR only consider source/sink level lineage. Column level lineage is not included for this work, so internal transformations not need lineage info for now. Would you please elaborate more about "I assume a sink could be a source - so could be in both current lists"? Hi Peter, usually we think of lineage assets as the nodes in the lineage (e.g. open lineage). So the asset could be a Kafka topic and that topic would be being used as a source for some flows and a sink for other flows. I was wondering how this fits with lineage at the table level, where there could be a table defined as a sink and a table defined as a source on the same Kafka topic. I guess when exporting / exposing to open lineage there could be many Flink tables referring to the same topic that would end up as one open lineage node. The natural way for Flink to store the lineage is at the table level - rather than at the asset level. So thinking about it, I think this is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2096594755 @PatrickRen I have removed schema facet and config facets, given these info are already provided by CatalogBaseTable. It greatly reduced the size of the PR. Would you please take one more round of review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2083346268 @davidradl Thanks for reviewing this PR. This PR is mainly to handle with source/sink level lineage, column level lineage will be need a further discussion in community. Resolved most of your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1583476719 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java: ## @@ -52,7 +60,9 @@ * must be 1. RowData is not serializable and the parallelism of table source may not be 1, so we Review Comment: As it is in the scope of this PR, I would prefer to resolve it in a separate doc improvement PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1583475226 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/ModifyType.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The existing `ModifyType` should be marked as `PublicEvolving` and users can get it from table Review Comment: Can't agree more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1583451354 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableColumnLineageEdge.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.lineage.LineageDataset; + +import java.util.List; + +/** + * Column lineage from source table columns to each sink table column, one sink column may be + * aggregated by multiple tables and columns. + */ +@PublicEvolving +public interface TableColumnLineageEdge { + +/** The dataset for source dataset. */ +LineageDataset source(); + +/** + * Columns from one source table of {@link org.apache.flink.streaming.api.lineage.LineageEdge} + * to the sink column. Each sink column may be computed from multiple columns from source, for + * example, avg operator from two columns in the source. + */ +List sourceColumns(); + +/* Sink table column. */ +String sinkColumn(); Review Comment: Yes, it is possible. If so, there will be multiple edges for the source column. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1583446509 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java: ## @@ -20,13 +20,12 @@ package org.apache.flink.streaming.api.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.List; /** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. + * Job lineage graph that users can get sources, sinks and relationships from lineage and manage the Review Comment: Thanks David for your comments. Yes, the documentation will be added after adding the job lineage listener which is more user facing. It is planned in this jira https://issues.apache.org/jira/browse/FLINK-33212. This PR only consider source/sink level lineage. Column level lineage is not included for this work, so internal transformations not need lineage info for now. Would you please elaborate more about "I assume a sink could be a source - so could be in both current lists"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1583447345 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java: ## @@ -42,7 +42,7 @@ * @param The type of the elements that result from this {@code OneInputTransformation} */ @Internal -public class OneInputTransformation extends PhysicalTransformation { +public class OneInputTransformation extends TransformationWithLineage { Review Comment: MultipleInputTransformation, KeyedMultipleInputTransformation, TwoInputTransformation are internal transformations. They will be useful for propagate the column level lineage info. But It is not in the scope of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582722531 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java: ## @@ -20,13 +20,12 @@ package org.apache.flink.streaming.api.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.List; /** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. + * Job lineage graph that users can get sources, sinks and relationships from lineage and manage the Review Comment: Can we add documentation please, so the user will know how to use this (very needed) functionality. I am wondering why we have source and sinks rather than just nodes. I assume a sink could be a source - so could be in both current lists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582750064 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java: ## @@ -52,7 +60,9 @@ * must be 1. RowData is not serializable and the parallelism of table source may not be 1, so we Review Comment: I am struggling to understand this sentence. What must be 1? when we say "RowData is not serializable and the parallelism of table source may not be 1", why is that? A because would be really helpful for the reader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582744606 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableColumnLineageEdge.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.lineage.LineageDataset; + +import java.util.List; + +/** + * Column lineage from source table columns to each sink table column, one sink column may be + * aggregated by multiple tables and columns. + */ +@PublicEvolving +public interface TableColumnLineageEdge { + +/** The dataset for source dataset. */ +LineageDataset source(); + +/** + * Columns from one source table of {@link org.apache.flink.streaming.api.lineage.LineageEdge} + * to the sink column. Each sink column may be computed from multiple columns from source, for + * example, avg operator from two columns in the source. + */ +List sourceColumns(); + +/* Sink table column. */ +String sinkColumn(); Review Comment: Can't we have 1 source column to multiple sink columns? I am wondering if this should be many to many? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582743283 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/ModifyType.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The existing `ModifyType` should be marked as `PublicEvolving` and users can get it from table Review Comment: I find this comment misleading. If there is another modifyType - then this should be referred to with a link in the javadoc. Is the should a TODO or no longer needed? I see SinkModifyOperation has an identical modifyType which I assume you are referring to. Can we not use this enum / refactor to one enum? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582733973 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java: ## @@ -42,7 +42,7 @@ * @param The type of the elements that result from this {@code OneInputTransformation} */ @Internal -public class OneInputTransformation extends PhysicalTransformation { +public class OneInputTransformation extends TransformationWithLineage { Review Comment: I can see that some PhysicalTransformations now subclass TransformationWithLineage. How did you decide which ones to use? I am thinking we would want lineage for MultipleInputTransformation, KeyedMultipleInputTransformation, TwoInputTransformation and maybe others? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
davidradl commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582722531 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java: ## @@ -20,13 +20,12 @@ package org.apache.flink.streaming.api.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.List; /** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. + * Job lineage graph that users can get sources, sinks and relationships from lineage and manage the Review Comment: Can we add documentation please, so the user will know how to use this (very needed) functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2081875974 @PatrickRen Thanks for reviewing the RP. For the testing purpose, I only added lineage provider implementation for values related source functions and input format. I will add lineage provider for Hive in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1582530548 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableDataSetSchemaFacet.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaField; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +/** Default implementation for DatasetSchemaFacet. */ +public class TableDataSetSchemaFacet implements DatasetSchemaFacet { Review Comment: I agree. It is just for exposing the data in a structured way. After the implementation, I feel we probably don't need to expose CatalogContext and CatalogBaseTable to users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1581716086 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ## @@ -133,6 +139,21 @@ protected Transformation translateToPlanInternal( outputTypeInfo, sourceParallelism, sourceParallelismConfigured); + +LineageDataset tableLineageDataset = Review Comment: Thanks for the idea proposed. Yes, I think read the lineage info from connector totally makes sense. If the connector doesn't implement LineageVertexProvider, then we will leave namespace as empty here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1581321727 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java: ## @@ -34,6 +35,7 @@ public abstract class PhysicalTransformation extends Transformation { private boolean supportsConcurrentExecutionAttempts = true; +private LineageVertex lineageVertex; Review Comment: Make sense. Added an TransformationWithLineage class for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1581715242 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -123,6 +124,7 @@ public class StreamGraph implements Pipeline { private CheckpointStorage checkpointStorage; private Set> iterationSourceSinkPairs; private InternalTimeServiceManager.Provider timerServiceProvider; +private LineageGraph lineageGraph; Review Comment: As we discussed offline, we will keep it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1581326630 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java: ## @@ -90,6 +90,7 @@ protected Transformation createConversionTransformationIfNeeded( final RowType outputType = (RowType) getOutputType(); final Transformation transformation; final int[] fieldIndexes = computeIndexMapping(true); + Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1581321727 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java: ## @@ -34,6 +35,7 @@ public abstract class PhysicalTransformation extends Transformation { private boolean supportsConcurrentExecutionAttempts = true; +private LineageVertex lineageVertex; Review Comment: Make sense. Added an LineagedTransformation class for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
PatrickRen commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1575873093 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.DefaultCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.listener.CatalogContext; +import org.apache.flink.table.factories.FactoryUtil; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Implementation for TableLineageDataSet. */ +public class TableLineageDatasetImpl implements TableLineageDataset { +@JsonProperty private String name; +@JsonProperty private String namespace; +private CatalogContext catalogContext; +private CatalogBaseTable catalogBaseTable; +@JsonProperty private ObjectPath objectPath; +@JsonProperty private Map facets; + +public TableLineageDatasetImpl(ContextResolvedTable contextResolvedTable) { +this.name = contextResolvedTable.getIdentifier().asSummaryString(); +this.namespace = inferNamespace(contextResolvedTable.getTable()).orElse(""); Review Comment: I'm not sure if the implementation here matches the definition on the interface. From Javadoc of LineageDataset: https://github.com/apache/flink/blob/0b2e98803542365136a212580a8a61078f7acaca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java#L32 Let's take JDBC connector as an example. My assumption is that the namespace should describe the URL of the database, or at least some identifier that can tell difference between difference DB instances. Here the implementation only writes `jdbc` as the namespace. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -123,6 +124,7 @@ public class StreamGraph implements Pipeline { private CheckpointStorage checkpointStorage; private Set> iterationSourceSinkPairs; private InternalTimeServiceManager.Provider timerServiceProvider; +private LineageGraph lineageGraph; Review Comment: The only usage of this field is for tests. Is it possible not to introduce it in `StreamGraph`? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableDataSetSchemaFacet.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaField; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +/** Default implementation for DatasetSchemaFacet. */ +public class TableDataSetSchemaFacet implements DatasetSchemaFacet { Revie
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2037597505 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
flinkbot commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2036689498 ## CI report: * 3a01cfd18673178c04d9dad319236c8bac6bddf7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2035113099 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu opened a new pull request, #24618: URL: https://github.com/apache/flink/pull/24618 ## What is the purpose of the change 1. Add Table Lineage Vertex into transformation in planner. The final LineageGraph is generated from transformation and put into StreamGraph. The lineage graph will be published to Lineage Listener in follow up PR. 2. Deprecated table source and sink are not considered as no enough info can be used for name and namespace for lineage dataset. ## Brief change log - add table lineage interface and default implementations - create lineage vertex and add them to transformation in the phase of physical plan to transformation conversion. ## Verifying this change 1. Add TableLineageGraphTest for both stream and batch. 2. Added LineageGraph verification in TransformationsTest for legacy sources. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org