This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 960363cd3c6 [FLINK-33211][table] support flink table lineage (#24618)
960363cd3c6 is described below

commit 960363cd3c6c82f7e56ef781295756105f7b5eba
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Tue Jul 2 18:58:18 2024 -0700

    [FLINK-33211][table] support flink table lineage (#24618)
---
 .../api/functions/source/FromElementsFunction.java |  30 ++++-
 .../flink/streaming/api/graph/StreamGraph.java     |   9 ++
 .../streaming/api/graph/StreamGraphGenerator.java  |   6 +-
 ...ineageGraph.java => DefaultLineageDataset.java} |  43 +++---
 .../{LineageGraph.java => DefaultLineageEdge.java} |  35 ++---
 .../streaming/api/lineage/DefaultLineageGraph.java |   8 +-
 .../flink/streaming/api/lineage/LineageGraph.java  |   5 +-
 .../streaming/api/lineage/LineageGraphUtils.java   |  86 ++++++++++++
 .../transformations/LegacySinkTransformation.java  |   2 +-
 .../LegacySourceTransformation.java                |   2 +-
 .../api/transformations/SinkTransformation.java    |   2 +-
 .../api/transformations/SourceTransformation.java  |   2 +-
 .../transformations/TransformationWithLineage.java |  75 +++++++++++
 .../translators/SinkTransformationTranslator.java  |   2 +-
 .../apache/flink/table/operations/ModifyType.java  |  22 +--
 .../table/operations/SinkModifyOperation.java      |  10 --
 flink-table/flink-table-planner/pom.xml            |   9 +-
 .../table/planner/lineage/TableLineageDataset.java |  30 ++---
 .../planner/lineage/TableLineageDatasetImpl.java   | 100 ++++++++++++++
 .../table/planner/lineage/TableLineageUtils.java   |  93 +++++++++++++
 .../planner/lineage/TableSinkLineageVertex.java    |  27 ++--
 .../lineage/TableSinkLineageVertexImpl.java        |  38 +++---
 .../planner/lineage/TableSourceLineageVertex.java  |  23 +---
 .../lineage/TableSourceLineageVertexImpl.java      |  47 +++++++
 .../operations/SqlNodeToOperationConversion.java   |   8 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  54 ++++++--
 .../exec/common/CommonExecTableSourceScan.java     |  61 +++++++--
 .../flink/connector/source/ValuesSource.java       |  44 +++++-
 .../factories/TestValuesRuntimeFunctions.java      |  96 +++++++++++--
 .../plan/batch/sql/TableLineageGraphTest.java      |  32 ++---
 .../plan/common/TableLineageGraphTestBase.java     | 150 +++++++++++++++++++++
 .../plan/nodes/exec/TransformationsTest.java       |  20 +++
 .../plan/stream/sql/TableLineageGraphTest.java     |  32 ++---
 .../test/resources/lineage-graph/query-batch.json  |  70 ++++++++++
 .../test/resources/lineage-graph/query-stream.json |  70 ++++++++++
 .../test/resources/lineage-graph/union-batch.json  | 117 ++++++++++++++++
 .../test/resources/lineage-graph/union-stream.json | 117 ++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  17 +++
 .../operators/values/ValuesInputFormat.java        |  29 +++-
 pom.xml                                            |   1 +
 40 files changed, 1413 insertions(+), 211 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index aff13245afa..c183b6494e2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -25,12 +25,18 @@ import 
org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.Preconditions;
@@ -45,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 
@@ -65,8 +72,11 @@ import java.util.Objects;
 @Deprecated
 @PublicEvolving
 public class FromElementsFunction<T>
-        implements SourceFunction<T>, CheckpointedFunction, 
OutputTypeConfigurable<T> {
-
+        implements SourceFunction<T>,
+                CheckpointedFunction,
+                OutputTypeConfigurable<T>,
+                LineageVertexProvider {
+    private static final String LINEAGE_NAMESPACE = 
"values://FromElementsFunction";
     private static final long serialVersionUID = 1L;
 
     /** The (de)serializer to be used for the data elements. */
@@ -305,4 +315,20 @@ public class FromElementsFunction<T>
             }
         }
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        return new SourceLineageVertex() {
+            @Override
+            public Boundedness boundedness() {
+                return Boundedness.BOUNDED;
+            }
+
+            @Override
+            public List<LineageDataset> datasets() {
+                return Arrays.asList(
+                        new DefaultLineageDataset("", LINEAGE_NAMESPACE, new 
HashMap<>()));
+            }
+        };
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 408bf00fd11..e6e9c749b9d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -124,6 +124,7 @@ public class StreamGraph implements Pipeline {
     private CheckpointStorage checkpointStorage;
     private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
     private InternalTimeServiceManager.Provider timerServiceProvider;
+    private LineageGraph lineageGraph;
     private JobType jobType = JobType.STREAMING;
     private Map<String, ResourceProfile> slotSharingGroupResources;
     private PipelineOptions.VertexDescriptionMode descriptionMode =
@@ -192,6 +193,14 @@ public class StreamGraph implements Pipeline {
         this.jobName = jobName;
     }
 
+    public LineageGraph getLineageGraph() {
+        return lineageGraph;
+    }
+
+    public void setLineageGraph(LineageGraph lineageGraph) {
+        this.lineageGraph = lineageGraph;
+    }
+
     public void setStateBackend(StateBackend backend) {
         this.stateBackend = backend;
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index a811cb2802e..d9ac8e59fbc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -46,6 +46,8 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
+import org.apache.flink.streaming.api.lineage.LineageGraphUtils;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
@@ -281,11 +283,13 @@ public class StreamGraphGenerator {
         for (Transformation<?> transformation : transformations) {
             transform(transformation);
         }
-
         streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
 
         setFineGrainedGlobalStreamExchangeMode(streamGraph);
 
+        LineageGraph lineageGraph = 
LineageGraphUtils.convertToLineageGraph(transformations);
+        streamGraph.setLineageGraph(lineageGraph);
+
         for (StreamNode node : streamGraph.getStreamNodes()) {
             if 
(node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing))
 {
                 for (StreamEdge edge : node.getInEdges()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java
similarity index 52%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java
index 7add1ce88ed..984eb17c54b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java
@@ -19,23 +19,36 @@
 
 package org.apache.flink.streaming.api.lineage;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.annotation.Internal;
 
-import java.util.List;
+import java.util.Map;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
-@PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+/** Default implementation for {@link LineageDataset}. */
+@Internal
+public class DefaultLineageDataset implements LineageDataset {
+    private String name;
+    private String namespace;
+    private Map<String, LineageDatasetFacet> facets;
+
+    public DefaultLineageDataset(
+            String name, String namespace, Map<String, LineageDatasetFacet> 
facets) {
+        this.name = name;
+        this.namespace = namespace;
+        this.facets = facets;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    @Override
+    public String namespace() {
+        return namespace;
+    }
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    @Override
+    public Map<String, LineageDatasetFacet> facets() {
+        return facets;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageEdge.java
similarity index 53%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageEdge.java
index 7add1ce88ed..b672a9ddd39 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageEdge.java
@@ -19,23 +19,28 @@
 
 package org.apache.flink.streaming.api.lineage;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.annotation.Internal;
 
-import java.util.List;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
-@PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+/** Implementation of LineageEdge. */
+@Internal
+public class DefaultLineageEdge implements LineageEdge {
+    @JsonProperty private SourceLineageVertex sourceLineageVertex;
+    @JsonProperty private LineageVertex sinkVertex;
+
+    public DefaultLineageEdge(SourceLineageVertex sourceLineageVertex, 
LineageVertex sinkVertex) {
+        this.sourceLineageVertex = sourceLineageVertex;
+        this.sinkVertex = sinkVertex;
+    }
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    @Override
+    public SourceLineageVertex source() {
+        return this.sourceLineageVertex;
+    }
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    @Override
+    public LineageVertex sink() {
+        return this.sinkVertex;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java
index 15e70bfec67..24109cd8c32 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.lineage;
 
 import org.apache.flink.annotation.Internal;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,9 +32,9 @@ import java.util.Set;
 /** Default implementation for {@link LineageGraph}. */
 @Internal
 public class DefaultLineageGraph implements LineageGraph {
-    private final List<LineageEdge> lineageEdges;
-    private final List<SourceLineageVertex> sources;
-    private final List<LineageVertex> sinks;
+    @JsonProperty private final List<LineageEdge> lineageEdges;
+    @JsonProperty private final List<SourceLineageVertex> sources;
+    @JsonProperty private final List<LineageVertex> sinks;
 
     private DefaultLineageGraph(List<LineageEdge> lineageEdges) {
         this.lineageEdges = lineageEdges;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
index 7add1ce88ed..a70f40eea72 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
@@ -20,13 +20,12 @@
 package org.apache.flink.streaming.api.lineage;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import java.util.List;
 
 /**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
+ * Job lineage graph that users can get sources, sinks and relationships from 
lineage and manage the
+ * relationship between jobs and tables.
  */
 @PublicEvolving
 public interface LineageGraph {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraphUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraphUtils.java
new file mode 100644
index 00000000000..550a57fa432
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraphUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** Utils for building lineage graph from transformations. */
+@Internal
+public class LineageGraphUtils {
+
+    /** Convert transforms to LineageGraph. */
+    public static LineageGraph convertToLineageGraph(List<Transformation<?>> 
transformations) {
+        DefaultLineageGraph.LineageGraphBuilder builder = 
DefaultLineageGraph.builder();
+        for (Transformation<?> transformation : transformations) {
+            List<LineageEdge> edges = processSink(transformation);
+            for (LineageEdge lineageEdge : edges) {
+                builder.addLineageEdge(lineageEdge);
+            }
+        }
+        return builder.build();
+    }
+
+    private static List<LineageEdge> processSink(Transformation<?> 
transformation) {
+        List<LineageEdge> lineageEdges = new ArrayList<>();
+        LineageVertex sinkLineageVertex = null;
+        if (transformation instanceof SinkTransformation) {
+            sinkLineageVertex = ((SinkTransformation<?, ?>) 
transformation).getLineageVertex();
+        } else if (transformation instanceof LegacySinkTransformation) {
+            sinkLineageVertex = ((LegacySinkTransformation) 
transformation).getLineageVertex();
+        }
+
+        if (sinkLineageVertex != null) {
+            List<Transformation<?>> predecessors = 
transformation.getTransitivePredecessors();
+            for (Transformation<?> predecessor : predecessors) {
+                Optional<SourceLineageVertex> sourceOpt = 
processSource(predecessor);
+                if (sourceOpt.isPresent()) {
+                    lineageEdges.add(new DefaultLineageEdge(sourceOpt.get(), 
sinkLineageVertex));
+                }
+            }
+        }
+        return lineageEdges;
+    }
+
+    private static Optional<SourceLineageVertex> 
processSource(Transformation<?> transformation) {
+        if (transformation instanceof SourceTransformation) {
+            if (((SourceTransformation) transformation).getLineageVertex() != 
null) {
+                return Optional.of(
+                        (SourceLineageVertex)
+                                ((SourceTransformation) 
transformation).getLineageVertex());
+            }
+        } else if (transformation instanceof LegacySourceTransformation) {
+            if (((LegacySourceTransformation) 
transformation).getLineageVertex() != null) {
+                return Optional.of(
+                        (SourceLineageVertex)
+                                ((LegacySourceTransformation) 
transformation).getLineageVertex());
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
index fcd753c13b4..745af25d238 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
@@ -46,7 +46,7 @@ import java.util.List;
  * @param <T> The type of the elements in the input {@code 
LegacySinkTransformation}
  */
 @Internal
-public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {
+public class LegacySinkTransformation<T> extends TransformationWithLineage<T> {
 
     private final Transformation<T> input;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
index 9443a3a14ea..51b101e47b6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
@@ -40,7 +40,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of the elements that this source produces
  */
 @Internal
-public class LegacySourceTransformation<T> extends PhysicalTransformation<T>
+public class LegacySourceTransformation<T> extends TransformationWithLineage<T>
         implements WithBoundedness {
 
     private final StreamOperatorFactory<T> operatorFactory;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
index a4b4310588e..86ceff45220 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
@@ -43,7 +43,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <OutputT> The output type of the {@link Sink}
  */
 @Internal
-public class SinkTransformation<InputT, OutputT> extends 
PhysicalTransformation<OutputT> {
+public class SinkTransformation<InputT, OutputT> extends 
TransformationWithLineage<OutputT> {
 
     private final DataStream<InputT> inputStream;
     private final Sink<InputT> sink;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index 0260da43b82..c5190d4477d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -37,7 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /** A {@link PhysicalTransformation} for {@link Source}. */
 @Internal
 public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
-        extends PhysicalTransformation<OUT> implements WithBoundedness {
+        extends TransformationWithLineage<OUT> implements WithBoundedness {
 
     private final Source<OUT, SplitT, EnumChkT> source;
     private final WatermarkStrategy<OUT> watermarkStrategy;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TransformationWithLineage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TransformationWithLineage.java
new file mode 100644
index 00000000000..91d0d8776eb
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TransformationWithLineage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+
+/**
+ * A {@link Transformation} that contains lineage information.
+ *
+ * @param <T> The type of the elements that result from this {@code 
Transformation}
+ * @see Transformation
+ */
+@Internal
+public abstract class TransformationWithLineage<T> extends 
PhysicalTransformation<T> {
+    private LineageVertex lineageVertex;
+
+    /**
+     * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown 
in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     */
+    TransformationWithLineage(String name, TypeInformation<T> outputType, int 
parallelism) {
+        super(name, outputType, parallelism);
+    }
+
+    /**
+     * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown 
in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     * @param parallelismConfigured If true, the parallelism of the 
transformation is explicitly set
+     *     and should be respected. Otherwise the parallelism can be changed 
at runtime.
+     */
+    TransformationWithLineage(
+            String name,
+            TypeInformation<T> outputType,
+            int parallelism,
+            boolean parallelismConfigured) {
+        super(name, outputType, parallelism, parallelismConfigured);
+    }
+
+    /** Returns the lineage vertex of this {@code Transformation}. */
+    public LineageVertex getLineageVertex() {
+        return lineageVertex;
+    }
+
+    /** Change the lineage vertex of this {@code Transformation}. */
+    public void setLineageVertex(LineageVertex lineageVertex) {
+        this.lineageVertex = lineageVertex;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index ffe0f3e28b1..14f03529d17 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for 
the {@link
- * org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ * SinkTransformation}.
  */
 @Internal
 public class SinkTransformationTranslator<Input, Output>
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyType.java
similarity index 60%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyType.java
index 7add1ce88ed..82892d3180d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyType.java
@@ -14,28 +14,18 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
+package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-
-import java.util.List;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
+/** The type of sink modification. */
 @PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+public enum ModifyType {
+    INSERT,
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    UPDATE,
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    DELETE
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
index 82ff8c1efcc..0b5e8ae3c7b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
@@ -163,14 +163,4 @@ public class SinkModifyOperation implements 
ModifyOperation {
                 Collections.singletonList(child),
                 Operation::asSummaryString);
     }
-
-    /** The type of sink modification. */
-    @Internal
-    public enum ModifyType {
-        INSERT,
-
-        UPDATE,
-
-        DELETE
-    }
 }
diff --git a/flink-table/flink-table-planner/pom.xml 
b/flink-table/flink-table-planner/pom.xml
index 08b506bd538..e1ec119de09 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -235,7 +235,14 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
-  </dependencies>
+
+               <dependency>
+                       <groupId>net.javacrumbs.json-unit</groupId>
+                       <artifactId>json-unit-assertj</artifactId>
+                       <version>2.23.0</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
 
        <build>
                <plugins>
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDataset.java
similarity index 54%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDataset.java
index 7add1ce88ed..4748cd1444f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDataset.java
@@ -14,28 +14,24 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
+package org.apache.flink.table.planner.lineage;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.listener.CatalogContext;
 
-import java.util.List;
+/** Basic table lineage dataset which has catalog context and table in it. */
+public interface TableLineageDataset extends LineageDataset {
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
-@PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+    /* The catalog context of the table lineage vertex. */
+    CatalogContext catalogContext();
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    /* The table of the lineage vertex. */
+    CatalogBaseTable table();
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    /* Database name and table name for the table lineage vertex. */
+    ObjectPath objectPath();
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
new file mode 100644
index 00000000000..98d0274e689
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.listener.CatalogContext;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Implementation for TableLineageDataSet. */
+public class TableLineageDatasetImpl implements TableLineageDataset {
+    @JsonProperty private String name;
+    @JsonProperty private String namespace;
+    private CatalogContext catalogContext;
+    private CatalogBaseTable catalogBaseTable;
+    @JsonProperty private ObjectPath objectPath;
+    @JsonProperty private Map<String, LineageDatasetFacet> facets;
+
+    public TableLineageDatasetImpl(
+            ContextResolvedTable contextResolvedTable, 
Optional<LineageDataset> lineageDatasetOpt) {
+        this.name = contextResolvedTable.getIdentifier().asSummaryString();
+        this.namespace =
+                lineageDatasetOpt.map(lineageDataset -> 
lineageDataset.namespace()).orElse("");
+        this.catalogContext =
+                CatalogContext.createContext(
+                        contextResolvedTable.getCatalog().isPresent()
+                                ? ((AbstractCatalog) 
contextResolvedTable.getCatalog().get())
+                                        .getName()
+                                : "",
+                        contextResolvedTable.getCatalog().orElse(null));
+        this.catalogBaseTable = contextResolvedTable.getTable();
+        this.objectPath =
+                contextResolvedTable.isAnonymous()
+                        ? null
+                        : contextResolvedTable.getIdentifier().toObjectPath();
+        this.facets = new HashMap<>();
+        if (lineageDatasetOpt.isPresent()) {
+            this.facets.putAll(lineageDatasetOpt.get().facets());
+        }
+    }
+
+    public void addLineageDatasetFacet(LineageDatasetFacet facet) {
+        facets.put(facet.name(), facet);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String namespace() {
+        return namespace;
+    }
+
+    @Override
+    public Map<String, LineageDatasetFacet> facets() {
+        return facets;
+    }
+
+    @Override
+    public CatalogContext catalogContext() {
+        return catalogContext;
+    }
+
+    @Override
+    public CatalogBaseTable table() {
+        return catalogBaseTable;
+    }
+
+    @Override
+    public ObjectPath objectPath() {
+        return objectPath;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageUtils.java
new file mode 100644
index 00000000000..f60c4a358f2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.operations.ModifyType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Util class for building table lineage graph. */
+public class TableLineageUtils {
+
+    /** Extract optional lineage info from output format, sink, or sink 
function. */
+    public static Optional<LineageVertex> extractLineageDataset(@Nullable 
Object object) {
+        if (object != null && object instanceof LineageVertexProvider) {
+            return Optional.of(((LineageVertexProvider) 
object).getLineageVertex());
+        }
+
+        return Optional.empty();
+    }
+
+    public static LineageDataset createTableLineageDataset(
+            ContextResolvedTable contextResolvedTable, Optional<LineageVertex> 
lineageDataset) {
+        String name = contextResolvedTable.getIdentifier().asSummaryString();
+        TableLineageDatasetImpl tableLineageDataset =
+                new TableLineageDatasetImpl(
+                        contextResolvedTable, findLineageDataset(name, 
lineageDataset));
+
+        return tableLineageDataset;
+    }
+
+    public static ModifyType convert(ChangelogMode inputChangelogMode) {
+        if (inputChangelogMode.containsOnly(RowKind.INSERT)) {
+            return ModifyType.INSERT;
+        } else if (inputChangelogMode.containsOnly(RowKind.DELETE)) {
+            return ModifyType.DELETE;
+        } else {
+            return ModifyType.UPDATE;
+        }
+    }
+
+    private static Optional<LineageDataset> findLineageDataset(
+            String name, Optional<LineageVertex> lineageVertexOpt) {
+        if (lineageVertexOpt.isPresent()) {
+            LineageVertex lineageVertex = lineageVertexOpt.get();
+            if (lineageVertex.datasets().size() == 1) {
+                return Optional.of(lineageVertex.datasets().get(0));
+            }
+
+            for (LineageDataset dataset : lineageVertex.datasets()) {
+                if (dataset.name().equals(name)) {
+                    return Optional.of(dataset);
+                }
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private static Map<String, String> extractOptions(CatalogBaseTable 
catalogBaseTable) {
+        try {
+            return catalogBaseTable.getOptions();
+        } catch (Exception e) {
+            return new HashMap<>();
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertex.java
similarity index 60%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertex.java
index 7add1ce88ed..bd92dadceac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertex.java
@@ -14,28 +14,21 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
+package org.apache.flink.table.planner.lineage;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-
-import java.util.List;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.table.operations.ModifyType;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
+/** Sink lineage vertex for table. */
 @PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
-
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+public interface TableSinkLineageVertex extends LineageVertex {
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    /**
+     * Modify type, INSERT/UPDATE/DELETE statement, listener can identify 
different sink by the type
+     * to determine whether the sink should be added to the lineage.
+     */
+    ModifyType modifyType();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertexImpl.java
similarity index 50%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertexImpl.java
index 7add1ce88ed..65baec88c13 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertexImpl.java
@@ -14,28 +14,34 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.table.operations.ModifyType;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.List;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
-@PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+/** Implementation of TableSinkLineageVertex. */
+public class TableSinkLineageVertexImpl implements TableSinkLineageVertex {
+    @JsonProperty private List<LineageDataset> datasets;
+    @JsonProperty private ModifyType modifyType;
+
+    public TableSinkLineageVertexImpl(List<LineageDataset> datasets, 
ModifyType modifyType) {
+        this.datasets = datasets;
+        this.modifyType = modifyType;
+    }
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    @Override
+    public List<LineageDataset> datasets() {
+        return datasets;
+    }
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    @Override
+    public ModifyType modifyType() {
+        return modifyType;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertex.java
similarity index 60%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertex.java
index 7add1ce88ed..d4e19985666 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertex.java
@@ -14,28 +14,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
+package org.apache.flink.table.planner.lineage;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-
-import java.util.List;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
+/** Source lineage vertex for table. */
 @PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
-
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
-
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
-}
+public interface TableSourceLineageVertex extends SourceLineageVertex {}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertexImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertexImpl.java
new file mode 100644
index 00000000000..83661226546
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertexImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Implementation of TableSourceLineageVertex. */
+public class TableSourceLineageVertexImpl implements TableSourceLineageVertex {
+    @JsonProperty private List<LineageDataset> datasets;
+    @JsonProperty private Boundedness boundedness;
+
+    public TableSourceLineageVertexImpl(List<LineageDataset> datasets, 
Boundedness boundedness) {
+        this.datasets = datasets;
+        this.boundedness = boundedness;
+    }
+
+    @Override
+    public List<LineageDataset> datasets() {
+        return this.datasets;
+    }
+
+    @Override
+    public Boundedness boundedness() {
+        return boundedness;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 0e3aac1f060..f73b314d56a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -122,6 +122,7 @@ import 
org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.ModifyType;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -1353,7 +1354,7 @@ public class SqlNodeToOperationConversion {
                 contextResolvedTable,
                 queryOperation,
                 null, // targetColumns
-                SinkModifyOperation.ModifyType.DELETE);
+                ModifyType.DELETE);
     }
 
     private Operation convertUpdate(SqlUpdate sqlUpdate) {
@@ -1382,10 +1383,7 @@ public class SqlNodeToOperationConversion {
                 getTargetColumnIndices(contextResolvedTable, 
sqlUpdate.getTargetColumnList());
 
         return new SinkModifyOperation(
-                contextResolvedTable,
-                queryOperation,
-                columnIndices,
-                SinkModifyOperation.ModifyType.UPDATE);
+                contextResolvedTable, queryOperation, columnIndices, 
ModifyType.UPDATE);
     }
 
     private int[][] getTargetColumnIndices(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index ccdfced77bd..e6179dbaa58 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -29,9 +29,12 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import 
org.apache.flink.streaming.api.transformations.TransformationWithLineage;
 import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -50,6 +53,9 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
+import org.apache.flink.table.planner.lineage.TableLineageUtils;
+import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
+import org.apache.flink.table.planner.lineage.TableSinkLineageVertexImpl;
 import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
@@ -174,6 +180,20 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             inputParallelism));
         }
 
+        Object outputObject = null;
+        if (runtimeProvider instanceof OutputFormatProvider) {
+            outputObject = ((OutputFormatProvider) 
runtimeProvider).createOutputFormat();
+        } else if (runtimeProvider instanceof SinkFunctionProvider) {
+            outputObject = ((SinkFunctionProvider) 
runtimeProvider).createSinkFunction();
+        } else if (runtimeProvider instanceof SinkProvider) {
+            outputObject = ((SinkProvider) runtimeProvider).createSink();
+        } else if (runtimeProvider instanceof SinkV2Provider) {
+            outputObject = ((SinkV2Provider) runtimeProvider).createSink();
+        }
+
+        Optional<LineageVertex> lineageVertexOpt =
+                TableLineageUtils.extractLineageDataset(outputObject);
+
         // only add materialization if input has change
         final boolean needMaterialization = !inputInsertOnly && 
upsertMaterialize;
 
@@ -209,15 +229,31 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             sinkTransform = applyRowKindSetter(sinkTransform, 
targetRowKind.get(), config);
         }
 
-        return (Transformation<Object>)
-                applySinkProvider(
-                        sinkTransform,
-                        streamExecEnv,
-                        runtimeProvider,
-                        rowtimeFieldIndex,
-                        sinkParallelism,
-                        config,
-                        classLoader);
+        LineageDataset tableLineageDataset =
+                TableLineageUtils.createTableLineageDataset(
+                        tableSinkSpec.getContextResolvedTable(), 
lineageVertexOpt);
+
+        TableSinkLineageVertex sinkLineageVertex =
+                new TableSinkLineageVertexImpl(
+                        Arrays.asList(tableLineageDataset),
+                        TableLineageUtils.convert(inputChangelogMode));
+
+        Transformation transformation =
+                (Transformation<Object>)
+                        applySinkProvider(
+                                sinkTransform,
+                                streamExecEnv,
+                                runtimeProvider,
+                                rowtimeFieldIndex,
+                                sinkParallelism,
+                                config,
+                                classLoader);
+
+        if (transformation instanceof TransformationWithLineage) {
+            ((TransformationWithLineage<Object>) transformation)
+                    .setLineageVertex(sinkLineageVertex);
+        }
+        return transformation;
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index be5b46ba973..dfdee5208c5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -29,10 +29,13 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import 
org.apache.flink.streaming.api.transformations.SourceTransformationWrapper;
+import 
org.apache.flink.streaming.api.transformations.TransformationWithLineage;
 import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -47,6 +50,9 @@ import org.apache.flink.table.connector.source.SourceProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.connectors.TransformationScanProvider;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.lineage.TableLineageUtils;
+import org.apache.flink.table.planner.lineage.TableSourceLineageVertex;
+import org.apache.flink.table.planner.lineage.TableSourceLineageVertexImpl;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
@@ -67,6 +73,7 @@ import org.apache.flink.types.RowKind;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 
@@ -121,9 +128,11 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
                 
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         final int sourceParallelism = deriveSourceParallelism(provider);
         final boolean sourceParallelismConfigured = 
isParallelismConfigured(provider);
+        Optional<LineageVertex> lineageVertex = Optional.empty();
         if (provider instanceof SourceFunctionProvider) {
             final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
             final SourceFunction<RowData> function = 
sourceFunctionProvider.createSourceFunction();
+            lineageVertex = TableLineageUtils.extractLineageDataset(function);
             sourceTransform =
                     createSourceFunctionTransformation(
                             env,
@@ -133,6 +142,20 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
                             outputTypeInfo,
                             sourceParallelism,
                             sourceParallelismConfigured);
+
+            LineageDataset tableLineageDataset =
+                    TableLineageUtils.createTableLineageDataset(
+                            tableSourceSpec.getContextResolvedTable(), 
lineageVertex);
+
+            TableSourceLineageVertex sourceLineageVertex =
+                    new TableSourceLineageVertexImpl(
+                            Arrays.asList(tableLineageDataset),
+                            provider.isBounded()
+                                    ? Boundedness.BOUNDED
+                                    : Boundedness.CONTINUOUS_UNBOUNDED);
+
+            ((TransformationWithLineage<RowData>) sourceTransform)
+                    .setLineageVertex(sourceLineageVertex);
             if (function instanceof ParallelSourceFunction && 
sourceParallelismConfigured) {
                 meta.fill(sourceTransform);
                 return new SourceTransformationWrapper<>(sourceTransform);
@@ -142,12 +165,14 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
         } else if (provider instanceof InputFormatProvider) {
             final InputFormat<RowData, ?> inputFormat =
                     ((InputFormatProvider) provider).createInputFormat();
+            lineageVertex = 
TableLineageUtils.extractLineageDataset(inputFormat);
             sourceTransform =
                     createInputFormatTransformation(
                             env, inputFormat, outputTypeInfo, meta.getName());
             meta.fill(sourceTransform);
         } else if (provider instanceof SourceProvider) {
             final Source<RowData, ?, ?> source = ((SourceProvider) 
provider).createSource();
+            lineageVertex = TableLineageUtils.extractLineageDataset(source);
             // TODO: Push down watermark strategy to source scan
             sourceTransform =
                     env.fromSource(
@@ -175,17 +200,35 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
                     provider.getClass().getSimpleName() + " is unsupported 
now.");
         }
 
+        LineageDataset tableLineageDataset =
+                TableLineageUtils.createTableLineageDataset(
+                        tableSourceSpec.getContextResolvedTable(), 
lineageVertex);
+
+        TableSourceLineageVertex sourceLineageVertex =
+                new TableSourceLineageVertexImpl(
+                        Arrays.asList(tableLineageDataset),
+                        provider.isBounded()
+                                ? Boundedness.BOUNDED
+                                : Boundedness.CONTINUOUS_UNBOUNDED);
+
+        if (sourceTransform instanceof TransformationWithLineage) {
+            ((TransformationWithLineage<RowData>) sourceTransform)
+                    .setLineageVertex(sourceLineageVertex);
+        }
+
         if (sourceParallelismConfigured) {
-            return applySourceTransformationWrapper(
-                    sourceTransform,
-                    planner.getFlinkContext().getClassLoader(),
-                    outputTypeInfo,
-                    config,
-                    tableSource.getChangelogMode(),
-                    sourceParallelism);
-        } else {
-            return sourceTransform;
+            Transformation<RowData> sourceTransformationWrapper =
+                    applySourceTransformationWrapper(
+                            sourceTransform,
+                            planner.getFlinkContext().getClassLoader(),
+                            outputTypeInfo,
+                            config,
+                            tableSource.getChangelogMode(),
+                            sourceParallelism);
+            return sourceTransformationWrapper;
         }
+
+        return sourceTransform;
     }
 
     private boolean 
isParallelismConfigured(ScanTableSource.ScanRuntimeProvider runtimeProvider) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java
index e4b24d82e7c..69ebba90bd7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java
@@ -32,14 +32,22 @@ import 
org.apache.flink.connector.source.split.ValuesSourceSplit;
 import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -52,7 +60,9 @@ import java.util.stream.IntStream;
  * must be 1. RowData is not serializable and the parallelism of table source 
may not be 1, so we
  * introduce a new source for testing in table module.
  */
-public class ValuesSource implements Source<RowData, ValuesSourceSplit, 
NoOpEnumState> {
+public class ValuesSource
+        implements Source<RowData, ValuesSourceSplit, NoOpEnumState>, 
LineageVertexProvider {
+    private static final String LINEAGE_NAMESPACE = "values://ValuesSource";
     private final TypeSerializer<RowData> serializer;
 
     private final List<byte[]> serializedElements;
@@ -126,4 +136,36 @@ public class ValuesSource implements Source<RowData, 
ValuesSourceSplit, NoOpEnum
     public SimpleVersionedSerializer<NoOpEnumState> 
getEnumeratorCheckpointSerializer() {
         return new NoOpEnumStateSerializer();
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        return new SourceLineageVertex() {
+            @Override
+            public Boundedness boundedness() {
+                return boundedness;
+            }
+
+            @Override
+            public List<LineageDataset> datasets() {
+                LineageDataset dataset =
+                        new LineageDataset() {
+                            @Override
+                            public String name() {
+                                return "";
+                            }
+
+                            @Override
+                            public String namespace() {
+                                return LINEAGE_NAMESPACE;
+                            }
+
+                            @Override
+                            public Map<String, LineageDatasetFacet> facets() {
+                                return new HashMap<>();
+                            }
+                        };
+                return Arrays.asList(dataset);
+            }
+        };
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index acc4cf4a0cd..0f8d0dd70a9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -35,6 +36,11 @@ import 
org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.table.connector.RuntimeConverter;
 import 
org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
 import org.apache.flink.table.connector.source.LookupTableSource;
@@ -182,6 +188,16 @@ final class TestValuesRuntimeFunctions {
         }
     }
 
+    static LineageVertex createLineageVertex(String name, String namespace) {
+        return new LineageVertex() {
+
+            @Override
+            public List<LineageDataset> datasets() {
+                return Arrays.asList(new DefaultLineageDataset(name, 
namespace, new HashMap<>()));
+            }
+        };
+    }
+
     private static String rowToString(Row row) {
         if (RowUtils.USE_LEGACY_TO_STRING) {
             return String.format("%s(%s)", row.getKind().shortString(), row);
@@ -194,7 +210,10 @@ final class TestValuesRuntimeFunctions {
     // Source Function implementations
     // 
------------------------------------------------------------------------------------------
 
-    public static class FromElementSourceFunctionWithWatermark implements 
SourceFunction<RowData> {
+    public static class FromElementSourceFunctionWithWatermark
+            implements SourceFunction<RowData>, LineageVertexProvider {
+        private static final String LINEAGE_NAMESPACE =
+                "values://FromElementSourceFunctionWithWatermark";
 
         /** The (de)serializer to be used for the data elements. */
         private final TypeSerializer<RowData> serializer;
@@ -278,6 +297,23 @@ final class TestValuesRuntimeFunctions {
             isRunning = false;
         }
 
+        @Override
+        public LineageVertex getLineageVertex() {
+            return new SourceLineageVertex() {
+                @Override
+                public Boundedness boundedness() {
+                    return Boundedness.BOUNDED;
+                }
+
+                @Override
+                public List<LineageDataset> datasets() {
+                    return Arrays.asList(
+                            new DefaultLineageDataset(
+                                    tableName, LINEAGE_NAMESPACE, new 
HashMap<>()));
+                }
+            };
+        }
+
         private class TestValuesWatermarkOutput implements WatermarkOutput {
             SourceContext<RowData> ctx;
 
@@ -314,7 +350,7 @@ final class TestValuesRuntimeFunctions {
      * restoring in streaming sql.
      */
     private abstract static class AbstractExactlyOnceSink extends 
RichSinkFunction<RowData>
-            implements CheckpointedFunction {
+            implements CheckpointedFunction, LineageVertexProvider {
         private static final long serialVersionUID = 1L;
 
         protected final String tableName;
@@ -359,6 +395,13 @@ final class TestValuesRuntimeFunctions {
             }
         }
 
+        @Override
+        public LineageVertex getLineageVertex() {
+            return createLineageVertex(tableName, getNamespace());
+        }
+
+        abstract String getNamespace();
+
         protected void addLocalRawResult(Row row) {
             localRawResult.add(row);
             Optional.ofNullable(localRawResultsObservers.get(tableName))
@@ -374,7 +417,7 @@ final class TestValuesRuntimeFunctions {
     }
 
     static class AppendingSinkFunction extends AbstractExactlyOnceSink {
-
+        private static final String LINEAGE_NAMESPACE = 
"values://AppendingSinkFunction";
         private static final long serialVersionUID = 1L;
         private final int rowtimeIndex;
 
@@ -407,6 +450,11 @@ final class TestValuesRuntimeFunctions {
                         "AppendingSinkFunction received " + value.getRowKind() 
+ " messages.");
             }
         }
+
+        @Override
+        public String getNamespace() {
+            return LINEAGE_NAMESPACE;
+        }
     }
 
     /**
@@ -414,6 +462,7 @@ final class TestValuesRuntimeFunctions {
      * databases.
      */
     static class KeyedUpsertingSinkFunction extends AbstractExactlyOnceSink {
+        private static final String LINEAGE_NAMESPACE = 
"values://KeyedUpsertingSinkFunction";
         private static final long serialVersionUID = 1L;
         private final int[] keyIndices;
         private final int[] targetColumnIndices;
@@ -510,9 +559,15 @@ final class TestValuesRuntimeFunctions {
                 return old;
             }
         }
+
+        @Override
+        public String getNamespace() {
+            return LINEAGE_NAMESPACE;
+        }
     }
 
     static class RetractingSinkFunction extends AbstractExactlyOnceSink {
+        private static final String LINEAGE_NAMESPACE = 
"values://RetractingSinkFunction";
         private static final long serialVersionUID = 1L;
 
         protected transient ListState<Row> retractResultState;
@@ -580,10 +635,16 @@ final class TestValuesRuntimeFunctions {
                 addLocalRawResult(row);
             }
         }
-    }
 
-    static class AppendingOutputFormat extends RichOutputFormat<RowData> {
+        @Override
+        public String getNamespace() {
+            return LINEAGE_NAMESPACE;
+        }
+    }
 
+    static class AppendingOutputFormat extends RichOutputFormat<RowData>
+            implements LineageVertexProvider {
+        private static final String LINEAGE_NAMESPACE = 
"values://AppendingOutputFormat";
         private static final long serialVersionUID = 1L;
         private final String tableName;
         private final DataStructureConverter converter;
@@ -633,6 +694,11 @@ final class TestValuesRuntimeFunctions {
             }
         }
 
+        @Override
+        public LineageVertex getLineageVertex() {
+            return createLineageVertex(tableName, LINEAGE_NAMESPACE);
+        }
+
         @Override
         public void close() throws IOException {
             // nothing to do
@@ -647,8 +713,9 @@ final class TestValuesRuntimeFunctions {
      * A lookup function which find matched rows with the given fields. NOTE: 
We have to declare it
      * as public because it will be used in code generation.
      */
-    public static class TestValuesLookupFunction extends LookupFunction {
-
+    public static class TestValuesLookupFunction extends LookupFunction
+            implements LineageVertexProvider {
+        private static final String LINEAGE_NAMESPACE = 
"values://TestValuesLookupFunction";
         private static final long serialVersionUID = 1L;
         private final List<Row> data;
         private final int[] lookupIndices;
@@ -703,6 +770,11 @@ final class TestValuesRuntimeFunctions {
             return indexedData.get(keyRow);
         }
 
+        @Override
+        public LineageVertex getLineageVertex() {
+            return createLineageVertex("", LINEAGE_NAMESPACE);
+        }
+
         @Override
         public void close() throws Exception {
             RESOURCE_COUNTER.decrementAndGet();
@@ -740,8 +812,9 @@ final class TestValuesRuntimeFunctions {
      * An async lookup function which find matched rows with the given fields. 
NOTE: We have to
      * declare it as public because it will be used in code generation.
      */
-    public static class AsyncTestValueLookupFunction extends 
AsyncLookupFunction {
-
+    public static class AsyncTestValueLookupFunction extends 
AsyncLookupFunction
+            implements LineageVertexProvider {
+        private static final String LINEAGE_NAMESPACE = 
"values://TestValuesLookupFunction";
         private static final long serialVersionUID = 1L;
         private final List<Row> data;
         private final int[] lookupIndices;
@@ -809,6 +882,11 @@ final class TestValuesRuntimeFunctions {
                     executor);
         }
 
+        @Override
+        public LineageVertex getLineageVertex() {
+            return createLineageVertex("", LINEAGE_NAMESPACE);
+        }
+
         @Override
         public void close() throws Exception {
             RESOURCE_COUNTER.decrementAndGet();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TableLineageGraphTest.java
similarity index 55%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TableLineageGraphTest.java
index 7add1ce88ed..ec04c992b3a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TableLineageGraphTest.java
@@ -14,28 +14,24 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+package org.apache.flink.table.planner.plan.batch.sql;
 
-import java.util.List;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.TableLineageGraphTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
-@PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+/** Lineage Graph tests for varies queries in batch. */
+public class TableLineageGraphTest extends TableLineageGraphTestBase {
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    @Override
+    protected TableTestUtil getTableTestUtil() {
+        return batchTestUtil(TableConfig.getDefault());
+    }
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    @Override
+    protected boolean isBatchMode() {
+        return true;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
new file mode 100644
index 00000000000..fc097cd51e4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
+
+import net.javacrumbs.jsonunit.core.Option;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson;
+
+/** Lineage Graph tests for varies queries. */
+public abstract class TableLineageGraphTestBase extends TableTestBase {
+    private static final String RESOURCE_PATH = 
"src/test/resources/lineage-graph/";
+    private final ObjectMapper mapper = new ObjectMapper();
+    private TableTestUtil util;
+
+    protected abstract boolean isBatchMode();
+
+    protected abstract TableTestUtil getTableTestUtil();
+
+    private final String query =
+            "SELECT\n"
+                    + "  AVG(a) AS avg_a,\n"
+                    + "  COUNT(*) AS cnt,\n"
+                    + "  count(b) AS cnt_b,\n"
+                    + "  min(b) AS min_b,\n"
+                    + "  MAX(c) FILTER (WHERE a > 1) AS max_c\n"
+                    + "FROM FirstTable";
+
+    private final String lookupJoin = "";
+
+    private final String union =
+            "("
+                    + query
+                    + ") UNION \n"
+                    + "(SELECT\n"
+                    + "  AVG(a) AS avg_a,\n"
+                    + "  COUNT(*) AS cnt,\n"
+                    + "  count(b) AS cnt_b,\n"
+                    + "  min(b) AS min_b,\n"
+                    + "  MAX(c) FILTER (WHERE a > 1) AS max_c\n"
+                    + "FROM SecondTable)";
+
+    @BeforeEach
+    void setup() {
+        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        util = getTableTestUtil();
+
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE TABLE FirstTable (\n"
+                                + "  a BIGINT,\n"
+                                + "  b INT NOT NULL,\n"
+                                + "  c VARCHAR,\n"
+                                + "  d BIGINT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = '"
+                                + isBatchMode()
+                                + "')");
+
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE TABLE SecondTable (\n"
+                                + "  a BIGINT,\n"
+                                + "  b INT NOT NULL,\n"
+                                + "  c VARCHAR,\n"
+                                + "  d BIGINT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = '"
+                                + isBatchMode()
+                                + "')");
+
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE TABLE SinkTable (\n"
+                                + "  avg_a DOUBLE,\n"
+                                + "  cnt BIGINT,\n"
+                                + "  cnt_b BIGINT,\n"
+                                + "  min_b BIGINT,\n"
+                                + "  max_c VARCHAR\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'sink-insert-only' = 'false')");
+    }
+
+    @Test
+    void testInsertWithSelect() throws Exception {
+        List<Transformation<?>> transformations =
+                util.generateTransformations(String.format("INSERT INTO 
SinkTable\n%s", query));
+        LineageGraph lineageGraph = generateLineageGraph(transformations);
+        verify(lineageGraph, isBatchMode() ? "query-batch.json" : 
"query-stream.json");
+    }
+
+    @Test
+    void testInsertWithUnion() throws Exception {
+        List<Transformation<?>> transformations =
+                util.generateTransformations(String.format("INSERT INTO 
SinkTable\n%s", union));
+        LineageGraph lineageGraph = generateLineageGraph(transformations);
+        verify(lineageGraph, isBatchMode() ? "union-batch.json" : 
"union-stream.json");
+    }
+
+    private LineageGraph generateLineageGraph(List<Transformation<?>> 
transformations) {
+        StreamGraphGenerator streamGraphGenerator =
+                new StreamGraphGenerator(
+                        transformations, new ExecutionConfig(), new 
CheckpointConfig());
+        StreamGraph graph = streamGraphGenerator.generate();
+        LineageGraph lineageGraph = graph.getLineageGraph();
+        return lineageGraph;
+    }
+
+    private void verify(LineageGraph lineageGraph, String jsonPath) throws 
Exception {
+        String json = mapper.writeValueAsString(lineageGraph);
+        String expected = new 
String(Files.readAllBytes(Paths.get(RESOURCE_PATH + jsonPath)));
+        
assertThatJson(json).when(Option.IGNORING_ARRAY_ORDER).isEqualTo(expected);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 43157e32a83..a1de42f5829 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
 import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.streaming.api.transformations.WithBoundedness;
 import org.apache.flink.table.api.CompiledPlan;
@@ -34,6 +35,7 @@ import org.apache.flink.table.api.TableDescriptor;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
+import org.apache.flink.table.planner.lineage.TableSourceLineageVertex;
 import org.apache.flink.table.planner.utils.JsonTestUtils;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -84,6 +86,15 @@ class TransformationsTest {
 
         assertBoundedness(Boundedness.BOUNDED, sourceTransform);
         
assertThat(sourceTransform.getOperator().emitsProgressiveWatermarks()).isFalse();
+
+        assertThat(sourceTransform.getLineageVertex()).isNotNull();
+        assertThat(((TableSourceLineageVertex) 
sourceTransform.getLineageVertex()).boundedness())
+                .isEqualTo(Boundedness.BOUNDED);
+
+        List<LineageDataset> datasets = 
sourceTransform.getLineageVertex().datasets();
+        assertThat(datasets.size()).isEqualTo(1);
+        assertThat(datasets.get(0).name()).contains("*anonymous_values$");
+        
assertThat(datasets.get(0).namespace()).isEqualTo("values://FromElementsFunction");
     }
 
     @Test
@@ -105,6 +116,15 @@ class TransformationsTest {
 
         assertBoundedness(Boundedness.CONTINUOUS_UNBOUNDED, sourceTransform);
         
assertThat(sourceTransform.getOperator().emitsProgressiveWatermarks()).isTrue();
+
+        assertThat(sourceTransform.getLineageVertex()).isNotNull();
+        assertThat(((TableSourceLineageVertex) 
sourceTransform.getLineageVertex()).boundedness())
+                .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+
+        List<LineageDataset> datasets = 
sourceTransform.getLineageVertex().datasets();
+        assertThat(datasets.size()).isEqualTo(1);
+        assertThat(datasets.get(0).name()).contains("*anonymous_values$");
+        
assertThat(datasets.get(0).namespace()).isEqualTo("values://FromElementsFunction");
     }
 
     @Test
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/TableLineageGraphTest.java
similarity index 55%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/TableLineageGraphTest.java
index 7add1ce88ed..34d6140185d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/TableLineageGraphTest.java
@@ -14,28 +14,24 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.flink.streaming.api.lineage;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+package org.apache.flink.table.planner.plan.stream.sql;
 
-import java.util.List;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.TableLineageGraphTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
 
-/**
- * Job lineage is built according to {@link StreamGraph}, users can get 
sources, sinks and
- * relationships from lineage and manage the relationship between jobs and 
tables.
- */
-@PublicEvolving
-public interface LineageGraph {
-    /* Source lineage vertex list. */
-    List<SourceLineageVertex> sources();
+/** Lineage Graph tests for varies queries in stream. */
+public class TableLineageGraphTest extends TableLineageGraphTestBase {
 
-    /* Sink lineage vertex list. */
-    List<LineageVertex> sinks();
+    @Override
+    protected TableTestUtil getTableTestUtil() {
+        return streamTestUtil(TableConfig.getDefault());
+    }
 
-    /* lineage edges from sources to sinks. */
-    List<LineageEdge> relations();
+    @Override
+    protected boolean isBatchMode() {
+        return false;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-batch.json
new file mode 100644
index 00000000000..bd1f60a00ca
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-batch.json
@@ -0,0 +1,70 @@
+{
+  "lineageEdges": [
+    {
+      "sourceLineageVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.FirstTable",
+            "namespace": "values://FromElementsFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "FirstTable",
+              "fullName": "default_database.FirstTable"
+            },
+            "facets": {}
+          }
+        ],
+        "boundedness": "BOUNDED"
+      },
+      "sinkVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SinkTable",
+            "namespace": "values://RetractingSinkFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SinkTable",
+              "fullName": "default_database.SinkTable"
+            },
+            "facets": {}
+          }
+        ],
+        "modifyType": "INSERT"
+      }
+    }
+  ],
+  "sources": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.FirstTable",
+          "namespace": "values://FromElementsFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "FirstTable",
+            "fullName": "default_database.FirstTable"
+          },
+          "facets": {}
+        }
+      ],
+      "boundedness": "BOUNDED"
+    }
+  ],
+  "sinks": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.SinkTable",
+          "namespace": "values://RetractingSinkFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "SinkTable",
+            "fullName": "default_database.SinkTable"
+          },
+          "facets": {}
+        }
+      ],
+      "modifyType": "INSERT"
+    }
+  ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-stream.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-stream.json
new file mode 100644
index 00000000000..485f6e919fc
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-stream.json
@@ -0,0 +1,70 @@
+{
+  "lineageEdges": [
+    {
+      "sourceLineageVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.FirstTable",
+            "namespace": "values://FromElementsFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "FirstTable",
+              "fullName": "default_database.FirstTable"
+            },
+            "facets": {}
+          }
+        ],
+        "boundedness": "CONTINUOUS_UNBOUNDED"
+      },
+      "sinkVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SinkTable",
+            "namespace": "values://RetractingSinkFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SinkTable",
+              "fullName": "default_database.SinkTable"
+            },
+            "facets": {}
+          }
+        ],
+        "modifyType": "UPDATE"
+      }
+    }
+  ],
+  "sources": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.FirstTable",
+          "namespace": "values://FromElementsFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "FirstTable",
+            "fullName": "default_database.FirstTable"
+          },
+          "facets": {}
+        }
+      ],
+      "boundedness": "CONTINUOUS_UNBOUNDED"
+    }
+  ],
+  "sinks": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.SinkTable",
+          "namespace": "values://RetractingSinkFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "SinkTable",
+            "fullName": "default_database.SinkTable"
+          },
+          "facets": {}
+        }
+      ],
+      "modifyType": "UPDATE"
+    }
+  ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-batch.json
new file mode 100644
index 00000000000..18a195dd8de
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-batch.json
@@ -0,0 +1,117 @@
+{
+  "lineageEdges": [
+    {
+      "sourceLineageVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.FirstTable",
+            "namespace": "values://FromElementsFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "FirstTable",
+              "fullName": "default_database.FirstTable"
+            },
+            "facets": {}
+          }
+        ],
+        "boundedness": "BOUNDED"
+      },
+      "sinkVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SinkTable",
+            "namespace": "values://RetractingSinkFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SinkTable",
+              "fullName": "default_database.SinkTable"
+            },
+            "facets": {}
+          }
+        ],
+        "modifyType": "INSERT"
+      }
+    },
+    {
+      "sourceLineageVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SecondTable",
+            "namespace": "values://FromElementsFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SecondTable",
+              "fullName": "default_database.SecondTable"
+            },
+            "facets": {}
+          }
+        ],
+        "boundedness": "BOUNDED"
+      },
+      "sinkVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SinkTable",
+            "namespace": "values://RetractingSinkFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SinkTable",
+              "fullName": "default_database.SinkTable"
+            },
+            "facets": {}
+          }
+        ],
+        "modifyType": "INSERT"
+      }
+    }
+  ],
+  "sources": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.FirstTable",
+          "namespace": "values://FromElementsFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "FirstTable",
+            "fullName": "default_database.FirstTable"
+          },
+          "facets": {}
+        }
+      ],
+      "boundedness": "BOUNDED"
+    },
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.SecondTable",
+          "namespace": "values://FromElementsFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "SecondTable",
+            "fullName": "default_database.SecondTable"
+          },
+          "facets": {}
+        }
+      ],
+      "boundedness": "BOUNDED"
+    }
+  ],
+  "sinks": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.SinkTable",
+          "namespace": "values://RetractingSinkFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "SinkTable",
+            "fullName": "default_database.SinkTable"
+          },
+          "facets": {}
+        }
+      ],
+      "modifyType": "INSERT"
+    }
+  ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-stream.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-stream.json
new file mode 100644
index 00000000000..66e641b7a77
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-stream.json
@@ -0,0 +1,117 @@
+{
+  "lineageEdges": [
+    {
+      "sourceLineageVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.FirstTable",
+            "namespace": "values://FromElementsFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "FirstTable",
+              "fullName": "default_database.FirstTable"
+            },
+            "facets": {}
+          }
+        ],
+        "boundedness": "CONTINUOUS_UNBOUNDED"
+      },
+      "sinkVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SinkTable",
+            "namespace": "values://RetractingSinkFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SinkTable",
+              "fullName": "default_database.SinkTable"
+            },
+            "facets": {}
+          }
+        ],
+        "modifyType": "UPDATE"
+      }
+    },
+    {
+      "sourceLineageVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SecondTable",
+            "namespace": "values://FromElementsFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SecondTable",
+              "fullName": "default_database.SecondTable"
+            },
+            "facets": {}
+          }
+        ],
+        "boundedness": "CONTINUOUS_UNBOUNDED"
+      },
+      "sinkVertex": {
+        "datasets": [
+          {
+            "name": "default_catalog.default_database.SinkTable",
+            "namespace": "values://RetractingSinkFunction",
+            "objectPath": {
+              "databaseName": "default_database",
+              "objectName": "SinkTable",
+              "fullName": "default_database.SinkTable"
+            },
+            "facets": {}
+          }
+        ],
+        "modifyType": "UPDATE"
+      }
+    }
+  ],
+  "sources": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.FirstTable",
+          "namespace": "values://FromElementsFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "FirstTable",
+            "fullName": "default_database.FirstTable"
+          },
+          "facets": {}
+        }
+      ],
+      "boundedness": "CONTINUOUS_UNBOUNDED"
+    },
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.SecondTable",
+          "namespace": "values://FromElementsFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "SecondTable",
+            "fullName": "default_database.SecondTable"
+          },
+          "facets": {}
+        }
+      ],
+      "boundedness": "CONTINUOUS_UNBOUNDED"
+    }
+  ],
+  "sinks": [
+    {
+      "datasets": [
+        {
+          "name": "default_catalog.default_database.SinkTable",
+          "namespace": "values://RetractingSinkFunction",
+          "objectPath": {
+            "databaseName": "default_database",
+            "objectName": "SinkTable",
+            "fullName": "default_database.SinkTable"
+          },
+          "facets": {}
+        }
+      ],
+      "modifyType": "UPDATE"
+    }
+  ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index a4c8d0ec1f7..4b19ef16150 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.utils
 
 import org.apache.flink.FlinkVersion
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.BatchExecutionOptions
@@ -27,6 +28,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
 import org.apache.flink.streaming.api.{environment, TimeCharacteristic}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, 
StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.graph.StreamGraph
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => 
JavaStreamTableEnv}
@@ -946,6 +948,21 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       withQueryBlockAlias = false)
   }
 
+  /**
+   * Generate the stream graph from the INSERT statement.
+   *
+   * @param insert
+   *   the INSERT statement to check
+   */
+  def generateTransformations(insert: String): util.List[Transformation[_]] = {
+    val stmtSet = getTableEnv.createStatementSet()
+    stmtSet.addInsertSql(insert)
+
+    val testStmtSet = stmtSet.asInstanceOf[StatementSetImpl[_]]
+    val operations = testStmtSet.getOperations;
+    getPlanner.translate(operations)
+  }
+
   /**
    * Verify the expected plans translated from the given [[Table]].
    *
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java
index e0d7e31114b..f430f86e0c1 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java
@@ -20,8 +20,14 @@ package org.apache.flink.table.runtime.operators.values;
 
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedInput;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -30,11 +36,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 
 /** Generated ValuesInputFormat. */
 public class ValuesInputFormat extends GenericInputFormat<RowData>
-        implements NonParallelInput, ResultTypeQueryable<RowData> {
-
+        implements NonParallelInput, ResultTypeQueryable<RowData>, 
LineageVertexProvider {
+    private static final String LINEAGE_NAMESPACE = 
"values://ValuesInputFormat";
     private static final Logger LOG = 
LoggerFactory.getLogger(ValuesInputFormat.class);
     private static final long serialVersionUID = 1L;
 
@@ -75,4 +84,20 @@ public class ValuesInputFormat extends 
GenericInputFormat<RowData>
     public InternalTypeInfo<RowData> getProducedType() {
         return returnType;
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        return new SourceLineageVertex() {
+            @Override
+            public Boundedness boundedness() {
+                return Boundedness.BOUNDED;
+            }
+
+            @Override
+            public List<LineageDataset> datasets() {
+                return Arrays.asList(
+                        new DefaultLineageDataset("", LINEAGE_NAMESPACE, new 
HashMap<>()));
+            }
+        };
+    }
 }
diff --git a/pom.xml b/pom.xml
index f5a3961b46b..1449c6801d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1679,6 +1679,7 @@ under the License.
                                                
<exclude>flink-table/flink-table-planner/src/test/resources/**/*.out</exclude>
                                                
<exclude>flink-table/flink-table-planner/src/test/resources/json/*.json</exclude>
                                                
<exclude>flink-table/flink-table-planner/src/test/resources/jsonplan/*.json</exclude>
+                                               
<exclude>flink-table/flink-table-planner/src/test/resources/lineage-graph/*.json</exclude>
                                                
<exclude>flink-table/flink-table-planner/src/test/resources/restore-tests/**</exclude>
                                                
<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
                                                
<exclude>flink-end-to-end-tests/test-scripts/test-data/**</exclude>

Reply via email to