This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 960363cd3c6 [FLINK-33211][table] support flink table lineage (#24618) 960363cd3c6 is described below commit 960363cd3c6c82f7e56ef781295756105f7b5eba Author: Peter Huang <huangzhenqiu0...@gmail.com> AuthorDate: Tue Jul 2 18:58:18 2024 -0700 [FLINK-33211][table] support flink table lineage (#24618) --- .../api/functions/source/FromElementsFunction.java | 30 ++++- .../flink/streaming/api/graph/StreamGraph.java | 9 ++ .../streaming/api/graph/StreamGraphGenerator.java | 6 +- ...ineageGraph.java => DefaultLineageDataset.java} | 43 +++--- .../{LineageGraph.java => DefaultLineageEdge.java} | 35 ++--- .../streaming/api/lineage/DefaultLineageGraph.java | 8 +- .../flink/streaming/api/lineage/LineageGraph.java | 5 +- .../streaming/api/lineage/LineageGraphUtils.java | 86 ++++++++++++ .../transformations/LegacySinkTransformation.java | 2 +- .../LegacySourceTransformation.java | 2 +- .../api/transformations/SinkTransformation.java | 2 +- .../api/transformations/SourceTransformation.java | 2 +- .../transformations/TransformationWithLineage.java | 75 +++++++++++ .../translators/SinkTransformationTranslator.java | 2 +- .../apache/flink/table/operations/ModifyType.java | 22 +-- .../table/operations/SinkModifyOperation.java | 10 -- flink-table/flink-table-planner/pom.xml | 9 +- .../table/planner/lineage/TableLineageDataset.java | 30 ++--- .../planner/lineage/TableLineageDatasetImpl.java | 100 ++++++++++++++ .../table/planner/lineage/TableLineageUtils.java | 93 +++++++++++++ .../planner/lineage/TableSinkLineageVertex.java | 27 ++-- .../lineage/TableSinkLineageVertexImpl.java | 38 +++--- .../planner/lineage/TableSourceLineageVertex.java | 23 +--- .../lineage/TableSourceLineageVertexImpl.java | 47 +++++++ .../operations/SqlNodeToOperationConversion.java | 8 +- .../plan/nodes/exec/common/CommonExecSink.java | 54 ++++++-- .../exec/common/CommonExecTableSourceScan.java | 61 +++++++-- .../flink/connector/source/ValuesSource.java | 44 +++++- .../factories/TestValuesRuntimeFunctions.java | 96 +++++++++++-- .../plan/batch/sql/TableLineageGraphTest.java | 32 ++--- .../plan/common/TableLineageGraphTestBase.java | 150 +++++++++++++++++++++ .../plan/nodes/exec/TransformationsTest.java | 20 +++ .../plan/stream/sql/TableLineageGraphTest.java | 32 ++--- .../test/resources/lineage-graph/query-batch.json | 70 ++++++++++ .../test/resources/lineage-graph/query-stream.json | 70 ++++++++++ .../test/resources/lineage-graph/union-batch.json | 117 ++++++++++++++++ .../test/resources/lineage-graph/union-stream.json | 117 ++++++++++++++++ .../flink/table/planner/utils/TableTestBase.scala | 17 +++ .../operators/values/ValuesInputFormat.java | 29 +++- pom.xml | 1 + 40 files changed, 1413 insertions(+), 211 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index aff13245afa..c183b6494e2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -25,12 +25,18 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.util.IterableUtils; import org.apache.flink.util.Preconditions; @@ -45,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Objects; @@ -65,8 +72,11 @@ import java.util.Objects; @Deprecated @PublicEvolving public class FromElementsFunction<T> - implements SourceFunction<T>, CheckpointedFunction, OutputTypeConfigurable<T> { - + implements SourceFunction<T>, + CheckpointedFunction, + OutputTypeConfigurable<T>, + LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = "values://FromElementsFunction"; private static final long serialVersionUID = 1L; /** The (de)serializer to be used for the data elements. */ @@ -305,4 +315,20 @@ public class FromElementsFunction<T> } } } + + @Override + public LineageVertex getLineageVertex() { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return Boundedness.BOUNDED; + } + + @Override + public List<LineageDataset> datasets() { + return Arrays.asList( + new DefaultLineageDataset("", LINEAGE_NAMESPACE, new HashMap<>())); + } + }; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 408bf00fd11..e6e9c749b9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -124,6 +124,7 @@ public class StreamGraph implements Pipeline { private CheckpointStorage checkpointStorage; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; private InternalTimeServiceManager.Provider timerServiceProvider; + private LineageGraph lineageGraph; private JobType jobType = JobType.STREAMING; private Map<String, ResourceProfile> slotSharingGroupResources; private PipelineOptions.VertexDescriptionMode descriptionMode = @@ -192,6 +193,14 @@ public class StreamGraph implements Pipeline { this.jobName = jobName; } + public LineageGraph getLineageGraph() { + return lineageGraph; + } + + public void setLineageGraph(LineageGraph lineageGraph) { + this.lineageGraph = lineageGraph; + } + public void setStateBackend(StateBackend backend) { this.stateBackend = backend; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index a811cb2802e..d9ac8e59fbc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -46,6 +46,8 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.lineage.LineageGraph; +import org.apache.flink.streaming.api.lineage.LineageGraphUtils; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend; @@ -281,11 +283,13 @@ public class StreamGraphGenerator { for (Transformation<?> transformation : transformations) { transform(transformation); } - streamGraph.setSlotSharingGroupResource(slotSharingGroupResources); setFineGrainedGlobalStreamExchangeMode(streamGraph); + LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph(transformations); + streamGraph.setLineageGraph(lineageGraph); + for (StreamNode node : streamGraph.getStreamNodes()) { if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) { for (StreamEdge edge : node.getInEdges()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java similarity index 52% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java index 7add1ce88ed..984eb17c54b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java @@ -19,23 +19,36 @@ package org.apache.flink.streaming.api.lineage; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.annotation.Internal; -import java.util.List; +import java.util.Map; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ -@PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); +/** Default implementation for {@link LineageDataset}. */ +@Internal +public class DefaultLineageDataset implements LineageDataset { + private String name; + private String namespace; + private Map<String, LineageDatasetFacet> facets; + + public DefaultLineageDataset( + String name, String namespace, Map<String, LineageDatasetFacet> facets) { + this.name = name; + this.namespace = namespace; + this.facets = facets; + } + + @Override + public String name() { + return name; + } - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + @Override + public String namespace() { + return namespace; + } - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + @Override + public Map<String, LineageDatasetFacet> facets() { + return facets; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageEdge.java similarity index 53% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageEdge.java index 7add1ce88ed..b672a9ddd39 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageEdge.java @@ -19,23 +19,28 @@ package org.apache.flink.streaming.api.lineage; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.annotation.Internal; -import java.util.List; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ -@PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); +/** Implementation of LineageEdge. */ +@Internal +public class DefaultLineageEdge implements LineageEdge { + @JsonProperty private SourceLineageVertex sourceLineageVertex; + @JsonProperty private LineageVertex sinkVertex; + + public DefaultLineageEdge(SourceLineageVertex sourceLineageVertex, LineageVertex sinkVertex) { + this.sourceLineageVertex = sourceLineageVertex; + this.sinkVertex = sinkVertex; + } - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + @Override + public SourceLineageVertex source() { + return this.sourceLineageVertex; + } - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + @Override + public LineageVertex sink() { + return this.sinkVertex; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java index 15e70bfec67..24109cd8c32 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageGraph.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.lineage; import org.apache.flink.annotation.Internal; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,9 +32,9 @@ import java.util.Set; /** Default implementation for {@link LineageGraph}. */ @Internal public class DefaultLineageGraph implements LineageGraph { - private final List<LineageEdge> lineageEdges; - private final List<SourceLineageVertex> sources; - private final List<LineageVertex> sinks; + @JsonProperty private final List<LineageEdge> lineageEdges; + @JsonProperty private final List<SourceLineageVertex> sources; + @JsonProperty private final List<LineageVertex> sinks; private DefaultLineageGraph(List<LineageEdge> lineageEdges) { this.lineageEdges = lineageEdges; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java index 7add1ce88ed..a70f40eea72 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java @@ -20,13 +20,12 @@ package org.apache.flink.streaming.api.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.List; /** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. + * Job lineage graph that users can get sources, sinks and relationships from lineage and manage the + * relationship between jobs and tables. */ @PublicEvolving public interface LineageGraph { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraphUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraphUtils.java new file mode 100644 index 00000000000..550a57fa432 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraphUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** Utils for building lineage graph from transformations. */ +@Internal +public class LineageGraphUtils { + + /** Convert transforms to LineageGraph. */ + public static LineageGraph convertToLineageGraph(List<Transformation<?>> transformations) { + DefaultLineageGraph.LineageGraphBuilder builder = DefaultLineageGraph.builder(); + for (Transformation<?> transformation : transformations) { + List<LineageEdge> edges = processSink(transformation); + for (LineageEdge lineageEdge : edges) { + builder.addLineageEdge(lineageEdge); + } + } + return builder.build(); + } + + private static List<LineageEdge> processSink(Transformation<?> transformation) { + List<LineageEdge> lineageEdges = new ArrayList<>(); + LineageVertex sinkLineageVertex = null; + if (transformation instanceof SinkTransformation) { + sinkLineageVertex = ((SinkTransformation<?, ?>) transformation).getLineageVertex(); + } else if (transformation instanceof LegacySinkTransformation) { + sinkLineageVertex = ((LegacySinkTransformation) transformation).getLineageVertex(); + } + + if (sinkLineageVertex != null) { + List<Transformation<?>> predecessors = transformation.getTransitivePredecessors(); + for (Transformation<?> predecessor : predecessors) { + Optional<SourceLineageVertex> sourceOpt = processSource(predecessor); + if (sourceOpt.isPresent()) { + lineageEdges.add(new DefaultLineageEdge(sourceOpt.get(), sinkLineageVertex)); + } + } + } + return lineageEdges; + } + + private static Optional<SourceLineageVertex> processSource(Transformation<?> transformation) { + if (transformation instanceof SourceTransformation) { + if (((SourceTransformation) transformation).getLineageVertex() != null) { + return Optional.of( + (SourceLineageVertex) + ((SourceTransformation) transformation).getLineageVertex()); + } + } else if (transformation instanceof LegacySourceTransformation) { + if (((LegacySourceTransformation) transformation).getLineageVertex() != null) { + return Optional.of( + (SourceLineageVertex) + ((LegacySourceTransformation) transformation).getLineageVertex()); + } + } + return Optional.empty(); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java index fcd753c13b4..745af25d238 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java @@ -46,7 +46,7 @@ import java.util.List; * @param <T> The type of the elements in the input {@code LegacySinkTransformation} */ @Internal -public class LegacySinkTransformation<T> extends PhysicalTransformation<T> { +public class LegacySinkTransformation<T> extends TransformationWithLineage<T> { private final Transformation<T> input; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java index 9443a3a14ea..51b101e47b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java @@ -40,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type of the elements that this source produces */ @Internal -public class LegacySourceTransformation<T> extends PhysicalTransformation<T> +public class LegacySourceTransformation<T> extends TransformationWithLineage<T> implements WithBoundedness { private final StreamOperatorFactory<T> operatorFactory; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java index a4b4310588e..86ceff45220 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java @@ -43,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <OutputT> The output type of the {@link Sink} */ @Internal -public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> { +public class SinkTransformation<InputT, OutputT> extends TransformationWithLineage<OutputT> { private final DataStream<InputT> inputStream; private final Sink<InputT> sink; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java index 0260da43b82..c5190d4477d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java @@ -37,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** A {@link PhysicalTransformation} for {@link Source}. */ @Internal public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT> - extends PhysicalTransformation<OUT> implements WithBoundedness { + extends TransformationWithLineage<OUT> implements WithBoundedness { private final Source<OUT, SplitT, EnumChkT> source; private final WatermarkStrategy<OUT> watermarkStrategy; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TransformationWithLineage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TransformationWithLineage.java new file mode 100644 index 00000000000..91d0d8776eb --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TransformationWithLineage.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.lineage.LineageVertex; + +/** + * A {@link Transformation} that contains lineage information. + * + * @param <T> The type of the elements that result from this {@code Transformation} + * @see Transformation + */ +@Internal +public abstract class TransformationWithLineage<T> extends PhysicalTransformation<T> { + private LineageVertex lineageVertex; + + /** + * Creates a new {@code Transformation} with the given name, output type and parallelism. + * + * @param name The name of the {@code Transformation}, this will be shown in Visualizations and + * the Log + * @param outputType The output type of this {@code Transformation} + * @param parallelism The parallelism of this {@code Transformation} + */ + TransformationWithLineage(String name, TypeInformation<T> outputType, int parallelism) { + super(name, outputType, parallelism); + } + + /** + * Creates a new {@code Transformation} with the given name, output type and parallelism. + * + * @param name The name of the {@code Transformation}, this will be shown in Visualizations and + * the Log + * @param outputType The output type of this {@code Transformation} + * @param parallelism The parallelism of this {@code Transformation} + * @param parallelismConfigured If true, the parallelism of the transformation is explicitly set + * and should be respected. Otherwise the parallelism can be changed at runtime. + */ + TransformationWithLineage( + String name, + TypeInformation<T> outputType, + int parallelism, + boolean parallelismConfigured) { + super(name, outputType, parallelism, parallelismConfigured); + } + + /** Returns the lineage vertex of this {@code Transformation}. */ + public LineageVertex getLineageVertex() { + return lineageVertex; + } + + /** Change the lineage vertex of this {@code Transformation}. */ + public void setLineageVertex(LineageVertex lineageVertex) { + this.lineageVertex = lineageVertex; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index ffe0f3e28b1..14f03529d17 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link - * org.apache.flink.streaming.api.transformations.SinkTransformation}. + * SinkTransformation}. */ @Internal public class SinkTransformationTranslator<Input, Output> diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyType.java similarity index 60% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyType.java index 7add1ce88ed..82892d3180d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyType.java @@ -14,28 +14,18 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; +package org.apache.flink.table.operations; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; - -import java.util.List; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ +/** The type of sink modification. */ @PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); +public enum ModifyType { + INSERT, - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + UPDATE, - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + DELETE } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java index 82ff8c1efcc..0b5e8ae3c7b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java @@ -163,14 +163,4 @@ public class SinkModifyOperation implements ModifyOperation { Collections.singletonList(child), Operation::asSummaryString); } - - /** The type of sink modification. */ - @Internal - public enum ModifyType { - INSERT, - - UPDATE, - - DELETE - } } diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index 08b506bd538..e1ec119de09 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -235,7 +235,14 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> - </dependencies> + + <dependency> + <groupId>net.javacrumbs.json-unit</groupId> + <artifactId>json-unit-assertj</artifactId> + <version>2.23.0</version> + <scope>test</scope> + </dependency> + </dependencies> <build> <plugins> diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDataset.java similarity index 54% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDataset.java index 7add1ce88ed..4748cd1444f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDataset.java @@ -14,28 +14,24 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; +package org.apache.flink.table.planner.lineage; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.listener.CatalogContext; -import java.util.List; +/** Basic table lineage dataset which has catalog context and table in it. */ +public interface TableLineageDataset extends LineageDataset { -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ -@PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); + /* The catalog context of the table lineage vertex. */ + CatalogContext catalogContext(); - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + /* The table of the lineage vertex. */ + CatalogBaseTable table(); - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + /* Database name and table name for the table lineage vertex. */ + ObjectPath objectPath(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java new file mode 100644 index 00000000000..98d0274e689 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.listener.CatalogContext; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Implementation for TableLineageDataSet. */ +public class TableLineageDatasetImpl implements TableLineageDataset { + @JsonProperty private String name; + @JsonProperty private String namespace; + private CatalogContext catalogContext; + private CatalogBaseTable catalogBaseTable; + @JsonProperty private ObjectPath objectPath; + @JsonProperty private Map<String, LineageDatasetFacet> facets; + + public TableLineageDatasetImpl( + ContextResolvedTable contextResolvedTable, Optional<LineageDataset> lineageDatasetOpt) { + this.name = contextResolvedTable.getIdentifier().asSummaryString(); + this.namespace = + lineageDatasetOpt.map(lineageDataset -> lineageDataset.namespace()).orElse(""); + this.catalogContext = + CatalogContext.createContext( + contextResolvedTable.getCatalog().isPresent() + ? ((AbstractCatalog) contextResolvedTable.getCatalog().get()) + .getName() + : "", + contextResolvedTable.getCatalog().orElse(null)); + this.catalogBaseTable = contextResolvedTable.getTable(); + this.objectPath = + contextResolvedTable.isAnonymous() + ? null + : contextResolvedTable.getIdentifier().toObjectPath(); + this.facets = new HashMap<>(); + if (lineageDatasetOpt.isPresent()) { + this.facets.putAll(lineageDatasetOpt.get().facets()); + } + } + + public void addLineageDatasetFacet(LineageDatasetFacet facet) { + facets.put(facet.name(), facet); + } + + @Override + public String name() { + return name; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map<String, LineageDatasetFacet> facets() { + return facets; + } + + @Override + public CatalogContext catalogContext() { + return catalogContext; + } + + @Override + public CatalogBaseTable table() { + return catalogBaseTable; + } + + @Override + public ObjectPath objectPath() { + return objectPath; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageUtils.java new file mode 100644 index 00000000000..f60c4a358f2 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageUtils.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.operations.ModifyType; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Util class for building table lineage graph. */ +public class TableLineageUtils { + + /** Extract optional lineage info from output format, sink, or sink function. */ + public static Optional<LineageVertex> extractLineageDataset(@Nullable Object object) { + if (object != null && object instanceof LineageVertexProvider) { + return Optional.of(((LineageVertexProvider) object).getLineageVertex()); + } + + return Optional.empty(); + } + + public static LineageDataset createTableLineageDataset( + ContextResolvedTable contextResolvedTable, Optional<LineageVertex> lineageDataset) { + String name = contextResolvedTable.getIdentifier().asSummaryString(); + TableLineageDatasetImpl tableLineageDataset = + new TableLineageDatasetImpl( + contextResolvedTable, findLineageDataset(name, lineageDataset)); + + return tableLineageDataset; + } + + public static ModifyType convert(ChangelogMode inputChangelogMode) { + if (inputChangelogMode.containsOnly(RowKind.INSERT)) { + return ModifyType.INSERT; + } else if (inputChangelogMode.containsOnly(RowKind.DELETE)) { + return ModifyType.DELETE; + } else { + return ModifyType.UPDATE; + } + } + + private static Optional<LineageDataset> findLineageDataset( + String name, Optional<LineageVertex> lineageVertexOpt) { + if (lineageVertexOpt.isPresent()) { + LineageVertex lineageVertex = lineageVertexOpt.get(); + if (lineageVertex.datasets().size() == 1) { + return Optional.of(lineageVertex.datasets().get(0)); + } + + for (LineageDataset dataset : lineageVertex.datasets()) { + if (dataset.name().equals(name)) { + return Optional.of(dataset); + } + } + } + + return Optional.empty(); + } + + private static Map<String, String> extractOptions(CatalogBaseTable catalogBaseTable) { + try { + return catalogBaseTable.getOptions(); + } catch (Exception e) { + return new HashMap<>(); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertex.java similarity index 60% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertex.java index 7add1ce88ed..bd92dadceac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertex.java @@ -14,28 +14,21 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; +package org.apache.flink.table.planner.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; - -import java.util.List; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.table.operations.ModifyType; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ +/** Sink lineage vertex for table. */ @PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); - - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); +public interface TableSinkLineageVertex extends LineageVertex { - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + /** + * Modify type, INSERT/UPDATE/DELETE statement, listener can identify different sink by the type + * to determine whether the sink should be added to the lineage. + */ + ModifyType modifyType(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertexImpl.java similarity index 50% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertexImpl.java index 7add1ce88ed..65baec88c13 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSinkLineageVertexImpl.java @@ -14,28 +14,34 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.table.operations.ModifyType; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ -@PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); +/** Implementation of TableSinkLineageVertex. */ +public class TableSinkLineageVertexImpl implements TableSinkLineageVertex { + @JsonProperty private List<LineageDataset> datasets; + @JsonProperty private ModifyType modifyType; + + public TableSinkLineageVertexImpl(List<LineageDataset> datasets, ModifyType modifyType) { + this.datasets = datasets; + this.modifyType = modifyType; + } - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + @Override + public List<LineageDataset> datasets() { + return datasets; + } - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + @Override + public ModifyType modifyType() { + return modifyType; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertex.java similarity index 60% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertex.java index 7add1ce88ed..d4e19985666 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertex.java @@ -14,28 +14,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; +package org.apache.flink.table.planner.lineage; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; - -import java.util.List; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ +/** Source lineage vertex for table. */ @PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); - - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); - - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); -} +public interface TableSourceLineageVertex extends SourceLineageVertex {} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertexImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertexImpl.java new file mode 100644 index 00000000000..83661226546 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableSourceLineageVertexImpl.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Implementation of TableSourceLineageVertex. */ +public class TableSourceLineageVertexImpl implements TableSourceLineageVertex { + @JsonProperty private List<LineageDataset> datasets; + @JsonProperty private Boundedness boundedness; + + public TableSourceLineageVertexImpl(List<LineageDataset> datasets, Boundedness boundedness) { + this.datasets = datasets; + this.boundedness = boundedness; + } + + @Override + public List<LineageDataset> datasets() { + return this.datasets; + } + + @Override + public Boundedness boundedness() { + return boundedness; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 0e3aac1f060..f73b314d56a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -122,6 +122,7 @@ import org.apache.flink.table.operations.EndStatementSetOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.ModifyType; import org.apache.flink.table.operations.NopOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; @@ -1353,7 +1354,7 @@ public class SqlNodeToOperationConversion { contextResolvedTable, queryOperation, null, // targetColumns - SinkModifyOperation.ModifyType.DELETE); + ModifyType.DELETE); } private Operation convertUpdate(SqlUpdate sqlUpdate) { @@ -1382,10 +1383,7 @@ public class SqlNodeToOperationConversion { getTargetColumnIndices(contextResolvedTable, sqlUpdate.getTargetColumnList()); return new SinkModifyOperation( - contextResolvedTable, - queryOperation, - columnIndices, - SinkModifyOperation.ModifyType.UPDATE); + contextResolvedTable, queryOperation, columnIndices, ModifyType.UPDATE); } private int[][] getTargetColumnIndices( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index ccdfced77bd..e6179dbaa58 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -29,9 +29,12 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.TransformationWithLineage; import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -50,6 +53,9 @@ import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.connectors.TransformationSinkProvider; +import org.apache.flink.table.planner.lineage.TableLineageUtils; +import org.apache.flink.table.planner.lineage.TableSinkLineageVertex; +import org.apache.flink.table.planner.lineage.TableSinkLineageVertexImpl; import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec; import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec; import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec; @@ -174,6 +180,20 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> inputParallelism)); } + Object outputObject = null; + if (runtimeProvider instanceof OutputFormatProvider) { + outputObject = ((OutputFormatProvider) runtimeProvider).createOutputFormat(); + } else if (runtimeProvider instanceof SinkFunctionProvider) { + outputObject = ((SinkFunctionProvider) runtimeProvider).createSinkFunction(); + } else if (runtimeProvider instanceof SinkProvider) { + outputObject = ((SinkProvider) runtimeProvider).createSink(); + } else if (runtimeProvider instanceof SinkV2Provider) { + outputObject = ((SinkV2Provider) runtimeProvider).createSink(); + } + + Optional<LineageVertex> lineageVertexOpt = + TableLineageUtils.extractLineageDataset(outputObject); + // only add materialization if input has change final boolean needMaterialization = !inputInsertOnly && upsertMaterialize; @@ -209,15 +229,31 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> sinkTransform = applyRowKindSetter(sinkTransform, targetRowKind.get(), config); } - return (Transformation<Object>) - applySinkProvider( - sinkTransform, - streamExecEnv, - runtimeProvider, - rowtimeFieldIndex, - sinkParallelism, - config, - classLoader); + LineageDataset tableLineageDataset = + TableLineageUtils.createTableLineageDataset( + tableSinkSpec.getContextResolvedTable(), lineageVertexOpt); + + TableSinkLineageVertex sinkLineageVertex = + new TableSinkLineageVertexImpl( + Arrays.asList(tableLineageDataset), + TableLineageUtils.convert(inputChangelogMode)); + + Transformation transformation = + (Transformation<Object>) + applySinkProvider( + sinkTransform, + streamExecEnv, + runtimeProvider, + rowtimeFieldIndex, + sinkParallelism, + config, + classLoader); + + if (transformation instanceof TransformationWithLineage) { + ((TransformationWithLineage<Object>) transformation) + .setLineageVertex(sinkLineageVertex); + } + return transformation; } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java index be5b46ba973..dfdee5208c5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java @@ -29,10 +29,13 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformationWrapper; +import org.apache.flink.streaming.api.transformations.TransformationWithLineage; import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.ResolvedSchema; @@ -47,6 +50,9 @@ import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.connectors.TransformationScanProvider; import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.lineage.TableLineageUtils; +import org.apache.flink.table.planner.lineage.TableSourceLineageVertex; +import org.apache.flink.table.planner.lineage.TableSourceLineageVertexImpl; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; @@ -67,6 +73,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -121,9 +128,11 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); final int sourceParallelism = deriveSourceParallelism(provider); final boolean sourceParallelismConfigured = isParallelismConfigured(provider); + Optional<LineageVertex> lineageVertex = Optional.empty(); if (provider instanceof SourceFunctionProvider) { final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider; final SourceFunction<RowData> function = sourceFunctionProvider.createSourceFunction(); + lineageVertex = TableLineageUtils.extractLineageDataset(function); sourceTransform = createSourceFunctionTransformation( env, @@ -133,6 +142,20 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> outputTypeInfo, sourceParallelism, sourceParallelismConfigured); + + LineageDataset tableLineageDataset = + TableLineageUtils.createTableLineageDataset( + tableSourceSpec.getContextResolvedTable(), lineageVertex); + + TableSourceLineageVertex sourceLineageVertex = + new TableSourceLineageVertexImpl( + Arrays.asList(tableLineageDataset), + provider.isBounded() + ? Boundedness.BOUNDED + : Boundedness.CONTINUOUS_UNBOUNDED); + + ((TransformationWithLineage<RowData>) sourceTransform) + .setLineageVertex(sourceLineageVertex); if (function instanceof ParallelSourceFunction && sourceParallelismConfigured) { meta.fill(sourceTransform); return new SourceTransformationWrapper<>(sourceTransform); @@ -142,12 +165,14 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> } else if (provider instanceof InputFormatProvider) { final InputFormat<RowData, ?> inputFormat = ((InputFormatProvider) provider).createInputFormat(); + lineageVertex = TableLineageUtils.extractLineageDataset(inputFormat); sourceTransform = createInputFormatTransformation( env, inputFormat, outputTypeInfo, meta.getName()); meta.fill(sourceTransform); } else if (provider instanceof SourceProvider) { final Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource(); + lineageVertex = TableLineageUtils.extractLineageDataset(source); // TODO: Push down watermark strategy to source scan sourceTransform = env.fromSource( @@ -175,17 +200,35 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> provider.getClass().getSimpleName() + " is unsupported now."); } + LineageDataset tableLineageDataset = + TableLineageUtils.createTableLineageDataset( + tableSourceSpec.getContextResolvedTable(), lineageVertex); + + TableSourceLineageVertex sourceLineageVertex = + new TableSourceLineageVertexImpl( + Arrays.asList(tableLineageDataset), + provider.isBounded() + ? Boundedness.BOUNDED + : Boundedness.CONTINUOUS_UNBOUNDED); + + if (sourceTransform instanceof TransformationWithLineage) { + ((TransformationWithLineage<RowData>) sourceTransform) + .setLineageVertex(sourceLineageVertex); + } + if (sourceParallelismConfigured) { - return applySourceTransformationWrapper( - sourceTransform, - planner.getFlinkContext().getClassLoader(), - outputTypeInfo, - config, - tableSource.getChangelogMode(), - sourceParallelism); - } else { - return sourceTransform; + Transformation<RowData> sourceTransformationWrapper = + applySourceTransformationWrapper( + sourceTransform, + planner.getFlinkContext().getClassLoader(), + outputTypeInfo, + config, + tableSource.getChangelogMode(), + sourceParallelism); + return sourceTransformationWrapper; } + + return sourceTransform; } private boolean isParallelismConfigured(ScanTableSource.ScanRuntimeProvider runtimeProvider) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java index e4b24d82e7c..69ebba90bd7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java @@ -32,14 +32,22 @@ import org.apache.flink.connector.source.split.ValuesSourceSplit; import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,7 +60,9 @@ import java.util.stream.IntStream; * must be 1. RowData is not serializable and the parallelism of table source may not be 1, so we * introduce a new source for testing in table module. */ -public class ValuesSource implements Source<RowData, ValuesSourceSplit, NoOpEnumState> { +public class ValuesSource + implements Source<RowData, ValuesSourceSplit, NoOpEnumState>, LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = "values://ValuesSource"; private final TypeSerializer<RowData> serializer; private final List<byte[]> serializedElements; @@ -126,4 +136,36 @@ public class ValuesSource implements Source<RowData, ValuesSourceSplit, NoOpEnum public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() { return new NoOpEnumStateSerializer(); } + + @Override + public LineageVertex getLineageVertex() { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return boundedness; + } + + @Override + public List<LineageDataset> datasets() { + LineageDataset dataset = + new LineageDataset() { + @Override + public String name() { + return ""; + } + + @Override + public String namespace() { + return LINEAGE_NAMESPACE; + } + + @Override + public Map<String, LineageDatasetFacet> facets() { + return new HashMap<>(); + } + }; + return Arrays.asList(dataset); + } + }; + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index acc4cf4a0cd..0f8d0dd70a9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -35,6 +36,11 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.table.connector.RuntimeConverter; import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter; import org.apache.flink.table.connector.source.LookupTableSource; @@ -182,6 +188,16 @@ final class TestValuesRuntimeFunctions { } } + static LineageVertex createLineageVertex(String name, String namespace) { + return new LineageVertex() { + + @Override + public List<LineageDataset> datasets() { + return Arrays.asList(new DefaultLineageDataset(name, namespace, new HashMap<>())); + } + }; + } + private static String rowToString(Row row) { if (RowUtils.USE_LEGACY_TO_STRING) { return String.format("%s(%s)", row.getKind().shortString(), row); @@ -194,7 +210,10 @@ final class TestValuesRuntimeFunctions { // Source Function implementations // ------------------------------------------------------------------------------------------ - public static class FromElementSourceFunctionWithWatermark implements SourceFunction<RowData> { + public static class FromElementSourceFunctionWithWatermark + implements SourceFunction<RowData>, LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = + "values://FromElementSourceFunctionWithWatermark"; /** The (de)serializer to be used for the data elements. */ private final TypeSerializer<RowData> serializer; @@ -278,6 +297,23 @@ final class TestValuesRuntimeFunctions { isRunning = false; } + @Override + public LineageVertex getLineageVertex() { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return Boundedness.BOUNDED; + } + + @Override + public List<LineageDataset> datasets() { + return Arrays.asList( + new DefaultLineageDataset( + tableName, LINEAGE_NAMESPACE, new HashMap<>())); + } + }; + } + private class TestValuesWatermarkOutput implements WatermarkOutput { SourceContext<RowData> ctx; @@ -314,7 +350,7 @@ final class TestValuesRuntimeFunctions { * restoring in streaming sql. */ private abstract static class AbstractExactlyOnceSink extends RichSinkFunction<RowData> - implements CheckpointedFunction { + implements CheckpointedFunction, LineageVertexProvider { private static final long serialVersionUID = 1L; protected final String tableName; @@ -359,6 +395,13 @@ final class TestValuesRuntimeFunctions { } } + @Override + public LineageVertex getLineageVertex() { + return createLineageVertex(tableName, getNamespace()); + } + + abstract String getNamespace(); + protected void addLocalRawResult(Row row) { localRawResult.add(row); Optional.ofNullable(localRawResultsObservers.get(tableName)) @@ -374,7 +417,7 @@ final class TestValuesRuntimeFunctions { } static class AppendingSinkFunction extends AbstractExactlyOnceSink { - + private static final String LINEAGE_NAMESPACE = "values://AppendingSinkFunction"; private static final long serialVersionUID = 1L; private final int rowtimeIndex; @@ -407,6 +450,11 @@ final class TestValuesRuntimeFunctions { "AppendingSinkFunction received " + value.getRowKind() + " messages."); } } + + @Override + public String getNamespace() { + return LINEAGE_NAMESPACE; + } } /** @@ -414,6 +462,7 @@ final class TestValuesRuntimeFunctions { * databases. */ static class KeyedUpsertingSinkFunction extends AbstractExactlyOnceSink { + private static final String LINEAGE_NAMESPACE = "values://KeyedUpsertingSinkFunction"; private static final long serialVersionUID = 1L; private final int[] keyIndices; private final int[] targetColumnIndices; @@ -510,9 +559,15 @@ final class TestValuesRuntimeFunctions { return old; } } + + @Override + public String getNamespace() { + return LINEAGE_NAMESPACE; + } } static class RetractingSinkFunction extends AbstractExactlyOnceSink { + private static final String LINEAGE_NAMESPACE = "values://RetractingSinkFunction"; private static final long serialVersionUID = 1L; protected transient ListState<Row> retractResultState; @@ -580,10 +635,16 @@ final class TestValuesRuntimeFunctions { addLocalRawResult(row); } } - } - static class AppendingOutputFormat extends RichOutputFormat<RowData> { + @Override + public String getNamespace() { + return LINEAGE_NAMESPACE; + } + } + static class AppendingOutputFormat extends RichOutputFormat<RowData> + implements LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = "values://AppendingOutputFormat"; private static final long serialVersionUID = 1L; private final String tableName; private final DataStructureConverter converter; @@ -633,6 +694,11 @@ final class TestValuesRuntimeFunctions { } } + @Override + public LineageVertex getLineageVertex() { + return createLineageVertex(tableName, LINEAGE_NAMESPACE); + } + @Override public void close() throws IOException { // nothing to do @@ -647,8 +713,9 @@ final class TestValuesRuntimeFunctions { * A lookup function which find matched rows with the given fields. NOTE: We have to declare it * as public because it will be used in code generation. */ - public static class TestValuesLookupFunction extends LookupFunction { - + public static class TestValuesLookupFunction extends LookupFunction + implements LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = "values://TestValuesLookupFunction"; private static final long serialVersionUID = 1L; private final List<Row> data; private final int[] lookupIndices; @@ -703,6 +770,11 @@ final class TestValuesRuntimeFunctions { return indexedData.get(keyRow); } + @Override + public LineageVertex getLineageVertex() { + return createLineageVertex("", LINEAGE_NAMESPACE); + } + @Override public void close() throws Exception { RESOURCE_COUNTER.decrementAndGet(); @@ -740,8 +812,9 @@ final class TestValuesRuntimeFunctions { * An async lookup function which find matched rows with the given fields. NOTE: We have to * declare it as public because it will be used in code generation. */ - public static class AsyncTestValueLookupFunction extends AsyncLookupFunction { - + public static class AsyncTestValueLookupFunction extends AsyncLookupFunction + implements LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = "values://TestValuesLookupFunction"; private static final long serialVersionUID = 1L; private final List<Row> data; private final int[] lookupIndices; @@ -809,6 +882,11 @@ final class TestValuesRuntimeFunctions { executor); } + @Override + public LineageVertex getLineageVertex() { + return createLineageVertex("", LINEAGE_NAMESPACE); + } + @Override public void close() throws Exception { RESOURCE_COUNTER.decrementAndGet(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TableLineageGraphTest.java similarity index 55% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TableLineageGraphTest.java index 7add1ce88ed..ec04c992b3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TableLineageGraphTest.java @@ -14,28 +14,24 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; +package org.apache.flink.table.planner.plan.batch.sql; -import java.util.List; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.plan.common.TableLineageGraphTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ -@PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); +/** Lineage Graph tests for varies queries in batch. */ +public class TableLineageGraphTest extends TableLineageGraphTestBase { - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + @Override + protected TableTestUtil getTableTestUtil() { + return batchTestUtil(TableConfig.getDefault()); + } - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + @Override + protected boolean isBatchMode() { + return true; + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java new file mode 100644 index 00000000000..fc097cd51e4 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.common; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.lineage.LineageGraph; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; + +import net.javacrumbs.jsonunit.core.Option; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; + +/** Lineage Graph tests for varies queries. */ +public abstract class TableLineageGraphTestBase extends TableTestBase { + private static final String RESOURCE_PATH = "src/test/resources/lineage-graph/"; + private final ObjectMapper mapper = new ObjectMapper(); + private TableTestUtil util; + + protected abstract boolean isBatchMode(); + + protected abstract TableTestUtil getTableTestUtil(); + + private final String query = + "SELECT\n" + + " AVG(a) AS avg_a,\n" + + " COUNT(*) AS cnt,\n" + + " count(b) AS cnt_b,\n" + + " min(b) AS min_b,\n" + + " MAX(c) FILTER (WHERE a > 1) AS max_c\n" + + "FROM FirstTable"; + + private final String lookupJoin = ""; + + private final String union = + "(" + + query + + ") UNION \n" + + "(SELECT\n" + + " AVG(a) AS avg_a,\n" + + " COUNT(*) AS cnt,\n" + + " count(b) AS cnt_b,\n" + + " min(b) AS min_b,\n" + + " MAX(c) FILTER (WHERE a > 1) AS max_c\n" + + "FROM SecondTable)"; + + @BeforeEach + void setup() { + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + util = getTableTestUtil(); + + util.getTableEnv() + .executeSql( + "CREATE TABLE FirstTable (\n" + + " a BIGINT,\n" + + " b INT NOT NULL,\n" + + " c VARCHAR,\n" + + " d BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = '" + + isBatchMode() + + "')"); + + util.getTableEnv() + .executeSql( + "CREATE TABLE SecondTable (\n" + + " a BIGINT,\n" + + " b INT NOT NULL,\n" + + " c VARCHAR,\n" + + " d BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = '" + + isBatchMode() + + "')"); + + util.getTableEnv() + .executeSql( + "CREATE TABLE SinkTable (\n" + + " avg_a DOUBLE,\n" + + " cnt BIGINT,\n" + + " cnt_b BIGINT,\n" + + " min_b BIGINT,\n" + + " max_c VARCHAR\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false')"); + } + + @Test + void testInsertWithSelect() throws Exception { + List<Transformation<?>> transformations = + util.generateTransformations(String.format("INSERT INTO SinkTable\n%s", query)); + LineageGraph lineageGraph = generateLineageGraph(transformations); + verify(lineageGraph, isBatchMode() ? "query-batch.json" : "query-stream.json"); + } + + @Test + void testInsertWithUnion() throws Exception { + List<Transformation<?>> transformations = + util.generateTransformations(String.format("INSERT INTO SinkTable\n%s", union)); + LineageGraph lineageGraph = generateLineageGraph(transformations); + verify(lineageGraph, isBatchMode() ? "union-batch.json" : "union-stream.json"); + } + + private LineageGraph generateLineageGraph(List<Transformation<?>> transformations) { + StreamGraphGenerator streamGraphGenerator = + new StreamGraphGenerator( + transformations, new ExecutionConfig(), new CheckpointConfig()); + StreamGraph graph = streamGraphGenerator.generate(); + LineageGraph lineageGraph = graph.getLineageGraph(); + return lineageGraph; + } + + private void verify(LineageGraph lineageGraph, String jsonPath) throws Exception { + String json = mapper.writeValueAsString(lineageGraph); + String expected = new String(Files.readAllBytes(Paths.get(RESOURCE_PATH + jsonPath))); + assertThatJson(json).when(Option.IGNORING_ARRAY_ORDER).isEqualTo(expected); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java index 43157e32a83..a1de42f5829 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.WithBoundedness; import org.apache.flink.table.api.CompiledPlan; @@ -34,6 +35,7 @@ import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.CompiledPlanUtils; +import org.apache.flink.table.planner.lineage.TableSourceLineageVertex; import org.apache.flink.table.planner.utils.JsonTestUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -84,6 +86,15 @@ class TransformationsTest { assertBoundedness(Boundedness.BOUNDED, sourceTransform); assertThat(sourceTransform.getOperator().emitsProgressiveWatermarks()).isFalse(); + + assertThat(sourceTransform.getLineageVertex()).isNotNull(); + assertThat(((TableSourceLineageVertex) sourceTransform.getLineageVertex()).boundedness()) + .isEqualTo(Boundedness.BOUNDED); + + List<LineageDataset> datasets = sourceTransform.getLineageVertex().datasets(); + assertThat(datasets.size()).isEqualTo(1); + assertThat(datasets.get(0).name()).contains("*anonymous_values$"); + assertThat(datasets.get(0).namespace()).isEqualTo("values://FromElementsFunction"); } @Test @@ -105,6 +116,15 @@ class TransformationsTest { assertBoundedness(Boundedness.CONTINUOUS_UNBOUNDED, sourceTransform); assertThat(sourceTransform.getOperator().emitsProgressiveWatermarks()).isTrue(); + + assertThat(sourceTransform.getLineageVertex()).isNotNull(); + assertThat(((TableSourceLineageVertex) sourceTransform.getLineageVertex()).boundedness()) + .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + + List<LineageDataset> datasets = sourceTransform.getLineageVertex().datasets(); + assertThat(datasets.size()).isEqualTo(1); + assertThat(datasets.get(0).name()).contains("*anonymous_values$"); + assertThat(datasets.get(0).namespace()).isEqualTo("values://FromElementsFunction"); } @Test diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/TableLineageGraphTest.java similarity index 55% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java copy to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/TableLineageGraphTest.java index 7add1ce88ed..34d6140185d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageGraph.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/TableLineageGraphTest.java @@ -14,28 +14,24 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ -package org.apache.flink.streaming.api.lineage; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.graph.StreamGraph; +package org.apache.flink.table.planner.plan.stream.sql; -import java.util.List; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.plan.common.TableLineageGraphTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; -/** - * Job lineage is built according to {@link StreamGraph}, users can get sources, sinks and - * relationships from lineage and manage the relationship between jobs and tables. - */ -@PublicEvolving -public interface LineageGraph { - /* Source lineage vertex list. */ - List<SourceLineageVertex> sources(); +/** Lineage Graph tests for varies queries in stream. */ +public class TableLineageGraphTest extends TableLineageGraphTestBase { - /* Sink lineage vertex list. */ - List<LineageVertex> sinks(); + @Override + protected TableTestUtil getTableTestUtil() { + return streamTestUtil(TableConfig.getDefault()); + } - /* lineage edges from sources to sinks. */ - List<LineageEdge> relations(); + @Override + protected boolean isBatchMode() { + return false; + } } diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-batch.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-batch.json new file mode 100644 index 00000000000..bd1f60a00ca --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-batch.json @@ -0,0 +1,70 @@ +{ + "lineageEdges": [ + { + "sourceLineageVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "BOUNDED" + }, + "sinkVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "INSERT" + } + } + ], + "sources": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "BOUNDED" + } + ], + "sinks": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "INSERT" + } + ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-stream.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-stream.json new file mode 100644 index 00000000000..485f6e919fc --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/query-stream.json @@ -0,0 +1,70 @@ +{ + "lineageEdges": [ + { + "sourceLineageVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "CONTINUOUS_UNBOUNDED" + }, + "sinkVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "UPDATE" + } + } + ], + "sources": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "CONTINUOUS_UNBOUNDED" + } + ], + "sinks": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "UPDATE" + } + ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-batch.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-batch.json new file mode 100644 index 00000000000..18a195dd8de --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-batch.json @@ -0,0 +1,117 @@ +{ + "lineageEdges": [ + { + "sourceLineageVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "BOUNDED" + }, + "sinkVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "INSERT" + } + }, + { + "sourceLineageVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SecondTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SecondTable", + "fullName": "default_database.SecondTable" + }, + "facets": {} + } + ], + "boundedness": "BOUNDED" + }, + "sinkVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "INSERT" + } + } + ], + "sources": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "BOUNDED" + }, + { + "datasets": [ + { + "name": "default_catalog.default_database.SecondTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SecondTable", + "fullName": "default_database.SecondTable" + }, + "facets": {} + } + ], + "boundedness": "BOUNDED" + } + ], + "sinks": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "INSERT" + } + ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-stream.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-stream.json new file mode 100644 index 00000000000..66e641b7a77 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/union-stream.json @@ -0,0 +1,117 @@ +{ + "lineageEdges": [ + { + "sourceLineageVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "CONTINUOUS_UNBOUNDED" + }, + "sinkVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "UPDATE" + } + }, + { + "sourceLineageVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SecondTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SecondTable", + "fullName": "default_database.SecondTable" + }, + "facets": {} + } + ], + "boundedness": "CONTINUOUS_UNBOUNDED" + }, + "sinkVertex": { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "UPDATE" + } + } + ], + "sources": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.FirstTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "FirstTable", + "fullName": "default_database.FirstTable" + }, + "facets": {} + } + ], + "boundedness": "CONTINUOUS_UNBOUNDED" + }, + { + "datasets": [ + { + "name": "default_catalog.default_database.SecondTable", + "namespace": "values://FromElementsFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SecondTable", + "fullName": "default_database.SecondTable" + }, + "facets": {} + } + ], + "boundedness": "CONTINUOUS_UNBOUNDED" + } + ], + "sinks": [ + { + "datasets": [ + { + "name": "default_catalog.default_database.SinkTable", + "namespace": "values://RetractingSinkFunction", + "objectPath": { + "databaseName": "default_database", + "objectName": "SinkTable", + "fullName": "default_database.SinkTable" + }, + "facets": {} + } + ], + "modifyType": "UPDATE" + } + ] +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index a4c8d0ec1f7..4b19ef16150 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.utils import org.apache.flink.FlinkVersion import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.dag.Transformation import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.BatchExecutionOptions @@ -27,6 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode import org.apache.flink.streaming.api.{environment, TimeCharacteristic} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment} +import org.apache.flink.streaming.api.graph.StreamGraph import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => JavaStreamTableEnv} @@ -946,6 +948,21 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withQueryBlockAlias = false) } + /** + * Generate the stream graph from the INSERT statement. + * + * @param insert + * the INSERT statement to check + */ + def generateTransformations(insert: String): util.List[Transformation[_]] = { + val stmtSet = getTableEnv.createStatementSet() + stmtSet.addInsertSql(insert) + + val testStmtSet = stmtSet.asInstanceOf[StatementSetImpl[_]] + val operations = testStmtSet.getOperations; + getPlanner.translate(operations) + } + /** * Verify the expected plans translated from the given [[Table]]. * diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java index e0d7e31114b..f430f86e0c1 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/values/ValuesInputFormat.java @@ -20,8 +20,14 @@ package org.apache.flink.table.runtime.operators.values; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedInput; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -30,11 +36,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; /** Generated ValuesInputFormat. */ public class ValuesInputFormat extends GenericInputFormat<RowData> - implements NonParallelInput, ResultTypeQueryable<RowData> { - + implements NonParallelInput, ResultTypeQueryable<RowData>, LineageVertexProvider { + private static final String LINEAGE_NAMESPACE = "values://ValuesInputFormat"; private static final Logger LOG = LoggerFactory.getLogger(ValuesInputFormat.class); private static final long serialVersionUID = 1L; @@ -75,4 +84,20 @@ public class ValuesInputFormat extends GenericInputFormat<RowData> public InternalTypeInfo<RowData> getProducedType() { return returnType; } + + @Override + public LineageVertex getLineageVertex() { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return Boundedness.BOUNDED; + } + + @Override + public List<LineageDataset> datasets() { + return Arrays.asList( + new DefaultLineageDataset("", LINEAGE_NAMESPACE, new HashMap<>())); + } + }; + } } diff --git a/pom.xml b/pom.xml index f5a3961b46b..1449c6801d8 100644 --- a/pom.xml +++ b/pom.xml @@ -1679,6 +1679,7 @@ under the License. <exclude>flink-table/flink-table-planner/src/test/resources/**/*.out</exclude> <exclude>flink-table/flink-table-planner/src/test/resources/json/*.json</exclude> <exclude>flink-table/flink-table-planner/src/test/resources/jsonplan/*.json</exclude> + <exclude>flink-table/flink-table-planner/src/test/resources/lineage-graph/*.json</exclude> <exclude>flink-table/flink-table-planner/src/test/resources/restore-tests/**</exclude> <exclude>flink-yarn/src/test/resources/krb5.keytab</exclude> <exclude>flink-end-to-end-tests/test-scripts/test-data/**</exclude>