This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bcacea9ca37 [FLINK-34657] extract lineage info for stream API (#25056)
bcacea9ca37 is described below

commit bcacea9ca376be62e83f9927393af0418f6f0711
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Sat Jul 20 07:01:31 2024 -0700

    [FLINK-34657] extract lineage info for stream API (#25056)
---
 .../streaming/api/datastream/DataStreamSink.java   |  12 +-
 .../api/lineage/DefaultLineageVertex.java          |  43 +++++
 .../api/lineage/DefaultSourceLineageVertex.java    |  52 ++++++
 .../LegacySourceTransformation.java                |  10 ++
 .../api/transformations/SourceTransformation.java  |   9 +
 .../api/lineage/LineageGraphUtilsTest.java         | 188 +++++++++++++++++++++
 .../execution/JobStatusChangedListenerITCase.java  |  68 +++++++-
 7 files changed, 379 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 7b8d241e6ec..7bcc7b3e95e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -55,13 +56,18 @@ public class DataStreamSink<T> {
         StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);
         final StreamExecutionEnvironment executionEnvironment =
                 inputStream.getExecutionEnvironment();
-        PhysicalTransformation<T> transformation =
+        LegacySinkTransformation<T> transformation =
                 new LegacySinkTransformation<>(
                         inputStream.getTransformation(),
                         "Unnamed",
                         sinkOperator,
                         executionEnvironment.getParallelism(),
                         false);
+        if (sinkFunction instanceof LineageVertexProvider) {
+            transformation.setLineageVertex(
+                    ((LineageVertexProvider) sinkFunction).getLineageVertex());
+        }
+
         executionEnvironment.addOperator(transformation);
         return new DataStreamSink<>(transformation);
     }
@@ -82,6 +88,10 @@ public class DataStreamSink<T> {
                         executionEnvironment.getParallelism(),
                         false,
                         customSinkOperatorUidHashes);
+        if (sink instanceof LineageVertexProvider) {
+            transformation.setLineageVertex(((LineageVertexProvider) 
sink).getLineageVertex());
+        }
+
         executionEnvironment.addOperator(transformation);
         return new DataStreamSink<>(transformation);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
new file mode 100644
index 00000000000..289021e3d2d
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation for {@link LineageVertex}. */
+@Internal
+public class DefaultLineageVertex implements LineageVertex {
+    private List<LineageDataset> lineageDatasets;
+
+    public DefaultLineageVertex() {
+        this.lineageDatasets = new ArrayList<>();
+    }
+
+    public void addLineageDataset(LineageDataset lineageDataset) {
+        this.lineageDatasets.add(lineageDataset);
+    }
+
+    @Override
+    public List<LineageDataset> datasets() {
+        return lineageDatasets;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
new file mode 100644
index 00000000000..fbc4ac4b2d4
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation for {@link SourceLineageVertex}. */
+@Internal
+public class DefaultSourceLineageVertex implements SourceLineageVertex {
+    private Boundedness boundedness;
+    private List<LineageDataset> lineageDatasets;
+
+    public DefaultSourceLineageVertex(Boundedness boundedness) {
+        this.lineageDatasets = new ArrayList<>();
+        this.boundedness = boundedness;
+    }
+
+    public void addDataset(LineageDataset lineageDataset) {
+        this.lineageDatasets.add(lineageDataset);
+    }
+
+    @Override
+    public List<LineageDataset> datasets() {
+        return this.lineageDatasets;
+    }
+
+    @Override
+    public Boundedness boundedness() {
+        return this.boundedness;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
index 51b101e47b6..c77fc4d154b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -69,6 +71,7 @@ public class LegacySourceTransformation<T> extends 
TransformationWithLineage<T>
         super(name, outputType, parallelism, parallelismConfigured);
         this.operatorFactory = 
checkNotNull(SimpleOperatorFactory.of(operator));
         this.boundedness = checkNotNull(boundedness);
+        this.extractLineageVertex(operator);
     }
 
     /** Mutable for legacy sources in the Table API. */
@@ -105,4 +108,11 @@ public class LegacySourceTransformation<T> extends 
TransformationWithLineage<T>
     public final void setChainingStrategy(ChainingStrategy strategy) {
         operatorFactory.setChainingStrategy(strategy);
     }
+
+    private void extractLineageVertex(StreamSource<T, ?> operator) {
+        SourceFunction sourceFunction = operator.getUserFunction();
+        if (sourceFunction instanceof LineageVertexProvider) {
+            setLineageVertex(((LineageVertexProvider) 
sourceFunction).getLineageVertex());
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index c5190d4477d..18389746a74 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import javax.annotation.Nullable;
@@ -64,6 +65,7 @@ public class SourceTransformation<OUT, SplitT extends 
SourceSplit, EnumChkT>
         super(name, outputType, parallelism);
         this.source = source;
         this.watermarkStrategy = watermarkStrategy;
+        this.extractLineageVertex();
     }
 
     public SourceTransformation(
@@ -76,6 +78,7 @@ public class SourceTransformation<OUT, SplitT extends 
SourceSplit, EnumChkT>
         super(name, outputType, parallelism, parallelismConfigured);
         this.source = source;
         this.watermarkStrategy = watermarkStrategy;
+        this.extractLineageVertex();
     }
 
     public Source<OUT, SplitT, EnumChkT> getSource() {
@@ -118,4 +121,10 @@ public class SourceTransformation<OUT, SplitT extends 
SourceSplit, EnumChkT>
     public String getCoordinatorListeningID() {
         return coordinatorListeningID;
     }
+
+    private void extractLineageVertex() {
+        if (source instanceof LineageVertexProvider) {
+            setLineageVertex(((LineageVertexProvider) 
source).getLineageVertex());
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
new file mode 100644
index 00000000000..75898fedee5
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testing for lineage graph util. */
+class LineageGraphUtilsTest {
+    private static final String SOURCE_DATASET_NAME = "LineageSource";
+    private static final String SOURCE_DATASET_NAMESPACE = 
"source://LineageSource";
+    private static final String SINK_DATASET_NAME = "LineageSink";
+    private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
+
+    private static final String LEGACY_SOURCE_DATASET_NAME = 
"LineageSourceFunction";
+    private static final String LEGACY_SOURCE_DATASET_NAMESPACE = 
"source://LineageSourceFunction";
+    private static final String LEGACY_SINK_DATASET_NAME = 
"LineageSinkFunction";
+    private static final String LEGACY_SINK_DATASET_NAMESPACE = 
"sink://LineageSinkFunction";
+
+    @Test
+    void testExtractLineageGraphFromLegacyTransformations() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Long> source = env.addSource(new 
LineageSourceFunction());
+        DataStreamSink<Long> sink = source.addSink(new LineageSinkFunction());
+
+        LineageGraph lineageGraph =
+                
LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));
+
+        assertThat(lineageGraph.sources().size()).isEqualTo(1);
+        assertThat(lineageGraph.sources().get(0).boundedness())
+                .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+        
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
+        assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
+                .isEqualTo(LEGACY_SOURCE_DATASET_NAME);
+        assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
+                .isEqualTo(LEGACY_SOURCE_DATASET_NAMESPACE);
+
+        assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+        assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
+        assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
+                .isEqualTo(LEGACY_SINK_DATASET_NAME);
+        assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
+                .isEqualTo(LEGACY_SINK_DATASET_NAMESPACE);
+
+        assertThat(lineageGraph.relations().size()).isEqualTo(1);
+    }
+
+    @Test
+    void testExtractLineageGraphFromTransformations() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Long> source =
+                env.fromSource(new LineageSource(1L, 5L), 
WatermarkStrategy.noWatermarks(), "");
+        DataStreamSink<Long> sink = source.sinkTo(new LineageSink());
+
+        LineageGraph lineageGraph =
+                
LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));
+
+        assertThat(lineageGraph.sources().size()).isEqualTo(1);
+        
assertThat(lineageGraph.sources().get(0).boundedness()).isEqualTo(Boundedness.BOUNDED);
+        
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
+        assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
+                .isEqualTo(SOURCE_DATASET_NAME);
+        assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
+                .isEqualTo(SOURCE_DATASET_NAMESPACE);
+
+        assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+        assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
+        assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
+                .isEqualTo(SINK_DATASET_NAME);
+        assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
+                .isEqualTo(SINK_DATASET_NAMESPACE);
+
+        assertThat(lineageGraph.relations().size()).isEqualTo(1);
+    }
+
+    private static class LineageSink extends DiscardingSink<Long> implements 
LineageVertexProvider {
+        public LineageSink() {
+            super();
+        }
+
+        @Override
+        public LineageVertex getLineageVertex() {
+            LineageDataset lineageDataset =
+                    new DefaultLineageDataset(
+                            SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new 
HashMap<>());
+            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
+            lineageVertex.addLineageDataset(lineageDataset);
+            return lineageVertex;
+        }
+    }
+
+    private static class LineageSource extends NumberSequenceSource
+            implements LineageVertexProvider {
+
+        public LineageSource(long from, long to) {
+            super(from, to);
+        }
+
+        @Override
+        public LineageVertex getLineageVertex() {
+            LineageDataset lineageDataset =
+                    new DefaultLineageDataset(
+                            SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new 
HashMap<>());
+            DefaultSourceLineageVertex lineageVertex =
+                    new DefaultSourceLineageVertex(Boundedness.BOUNDED);
+            lineageVertex.addDataset(lineageDataset);
+            return lineageVertex;
+        }
+    }
+
+    private static class LineageSourceFunction
+            implements SourceFunction<Long>, LineageVertexProvider {
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Long> ctx) throws Exception {
+            long next = 0;
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(next++);
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+
+        @Override
+        public LineageVertex getLineageVertex() {
+            LineageDataset lineageDataset =
+                    new DefaultLineageDataset(
+                            LEGACY_SOURCE_DATASET_NAME,
+                            LEGACY_SOURCE_DATASET_NAMESPACE,
+                            new HashMap<>());
+            DefaultSourceLineageVertex lineageVertex =
+                    new 
DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
+            lineageVertex.addDataset(lineageDataset);
+            return lineageVertex;
+        }
+    }
+
+    private static class LineageSinkFunction implements SinkFunction<Long>, 
LineageVertexProvider {
+
+        @Override
+        public LineageVertex getLineageVertex() {
+            LineageDataset lineageDataset =
+                    new DefaultLineageDataset(
+                            LEGACY_SINK_DATASET_NAME,
+                            LEGACY_SINK_DATASET_NAMESPACE,
+                            new HashMap<>());
+            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
+            lineageVertex.addLineageDataset(lineageDataset);
+            return lineageVertex;
+        }
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
index dd58ef184dc..8530aec01c6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
@@ -36,7 +37,15 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
+import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
+import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.streaming.runtime.execution.JobCreatedEvent;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.TestLogger;
@@ -51,6 +60,7 @@ import org.junit.runners.MethodSorters;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
@@ -61,6 +71,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class JobStatusChangedListenerITCase extends TestLogger {
     private static final int PARALLELISM = 4;
+    private static final String SOURCE_DATASET_NAME = "LineageSource";
+    private static final String SOURCE_DATASET_NAMESPACE = 
"source://LineageSource";
+    private static final String SINK_DATASET_NAME = "LineageSink";
+    private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
+
     @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
 
     @ClassRule
@@ -162,6 +177,7 @@ public class JobStatusChangedListenerITCase extends 
TestLogger {
             StreamGraph streamGraph = env.getStreamGraph();
             JobGraph jobGraph = streamGraph.getJobGraph();
 
+            verifyLineageGraph(streamGraph.getLineageGraph());
             ClusterClient<?> client = MINI_CLUSTER.getClusterClient();
             JobID jobID = client.submitJob(jobGraph).get();
             waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false);
@@ -183,9 +199,35 @@ public class JobStatusChangedListenerITCase extends 
TestLogger {
                                             || (status.oldStatus() == 
JobStatus.CANCELLING
                                                     && status.newStatus() == 
JobStatus.CANCELED))
                             .isTrue();
+
+                    if (event instanceof JobCreatedEvent) {
+                        LineageGraph lineageGraph = ((JobCreatedEvent) 
event).lineageGraph();
+                        assertThat(lineageGraph.sources().size()).isEqualTo(1);
+                        assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+                    }
                 });
     }
 
+    void verifyLineageGraph(LineageGraph lineageGraph) {
+        assertThat(lineageGraph.sources().size()).isEqualTo(1);
+        assertThat(lineageGraph.sources().get(0).boundedness())
+                .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+        
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
+        assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
+                .isEqualTo(SOURCE_DATASET_NAME);
+        assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
+                .isEqualTo(SOURCE_DATASET_NAMESPACE);
+
+        assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+        assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
+        assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
+                .isEqualTo(SINK_DATASET_NAME);
+        assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
+                .isEqualTo(SINK_DATASET_NAMESPACE);
+
+        assertThat(lineageGraph.relations().size()).isEqualTo(1);
+    }
+
     void verifyEventMetaData() {
         assertThat(statusChangedEvents.size()).isEqualTo(3);
         assertThat(statusChangedEvents.get(0).jobId())
@@ -238,7 +280,8 @@ public class JobStatusChangedListenerITCase extends 
TestLogger {
         public void cancel() {}
     }
 
-    private static class InfiniteLongSourceFunction implements 
SourceFunction<Long> {
+    private static class InfiniteLongSourceFunction
+            implements SourceFunction<Long>, LineageVertexProvider {
         private volatile boolean running = true;
 
         @Override
@@ -255,12 +298,33 @@ public class JobStatusChangedListenerITCase extends 
TestLogger {
         public void cancel() {
             running = false;
         }
+
+        @Override
+        public LineageVertex getLineageVertex() {
+            LineageDataset lineageDataset =
+                    new DefaultLineageDataset(
+                            SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new 
HashMap<>());
+            DefaultSourceLineageVertex lineageVertex =
+                    new 
DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
+            lineageVertex.addDataset(lineageDataset);
+            return lineageVertex;
+        }
     }
 
-    private static class SleepingSink implements SinkFunction<Long> {
+    private static class SleepingSink implements SinkFunction<Long>, 
LineageVertexProvider {
         @Override
         public void invoke(Long value, Context context) throws Exception {
             Thread.sleep(1_000);
         }
+
+        @Override
+        public LineageVertex getLineageVertex() {
+            LineageDataset lineageDataset =
+                    new DefaultLineageDataset(
+                            SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new 
HashMap<>());
+            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
+            lineageVertex.addLineageDataset(lineageDataset);
+            return lineageVertex;
+        }
     }
 }

Reply via email to