This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bcacea9ca37 [FLINK-34657] extract lineage info for stream API (#25056) bcacea9ca37 is described below commit bcacea9ca376be62e83f9927393af0418f6f0711 Author: Peter Huang <huangzhenqiu0...@gmail.com> AuthorDate: Sat Jul 20 07:01:31 2024 -0700 [FLINK-34657] extract lineage info for stream API (#25056) --- .../streaming/api/datastream/DataStreamSink.java | 12 +- .../api/lineage/DefaultLineageVertex.java | 43 +++++ .../api/lineage/DefaultSourceLineageVertex.java | 52 ++++++ .../LegacySourceTransformation.java | 10 ++ .../api/transformations/SourceTransformation.java | 9 + .../api/lineage/LineageGraphUtilsTest.java | 188 +++++++++++++++++++++ .../execution/JobStatusChangedListenerITCase.java | 68 +++++++- 7 files changed, 379 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 7b8d241e6ec..7bcc7b3e95e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -27,6 +27,7 @@ import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; @@ -55,13 +56,18 @@ public class DataStreamSink<T> { StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction); final StreamExecutionEnvironment executionEnvironment = inputStream.getExecutionEnvironment(); - PhysicalTransformation<T> transformation = + LegacySinkTransformation<T> transformation = new LegacySinkTransformation<>( inputStream.getTransformation(), "Unnamed", sinkOperator, executionEnvironment.getParallelism(), false); + if (sinkFunction instanceof LineageVertexProvider) { + transformation.setLineageVertex( + ((LineageVertexProvider) sinkFunction).getLineageVertex()); + } + executionEnvironment.addOperator(transformation); return new DataStreamSink<>(transformation); } @@ -82,6 +88,10 @@ public class DataStreamSink<T> { executionEnvironment.getParallelism(), false, customSinkOperatorUidHashes); + if (sink instanceof LineageVertexProvider) { + transformation.setLineageVertex(((LineageVertexProvider) sink).getLineageVertex()); + } + executionEnvironment.addOperator(transformation); return new DataStreamSink<>(transformation); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java new file mode 100644 index 00000000000..289021e3d2d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.Internal; + +import java.util.ArrayList; +import java.util.List; + +/** Default implementation for {@link LineageVertex}. */ +@Internal +public class DefaultLineageVertex implements LineageVertex { + private List<LineageDataset> lineageDatasets; + + public DefaultLineageVertex() { + this.lineageDatasets = new ArrayList<>(); + } + + public void addLineageDataset(LineageDataset lineageDataset) { + this.lineageDatasets.add(lineageDataset); + } + + @Override + public List<LineageDataset> datasets() { + return lineageDatasets; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java new file mode 100644 index 00000000000..fbc4ac4b2d4 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.Boundedness; + +import java.util.ArrayList; +import java.util.List; + +/** Default implementation for {@link SourceLineageVertex}. */ +@Internal +public class DefaultSourceLineageVertex implements SourceLineageVertex { + private Boundedness boundedness; + private List<LineageDataset> lineageDatasets; + + public DefaultSourceLineageVertex(Boundedness boundedness) { + this.lineageDatasets = new ArrayList<>(); + this.boundedness = boundedness; + } + + public void addDataset(LineageDataset lineageDataset) { + this.lineageDatasets.add(lineageDataset); + } + + @Override + public List<LineageDataset> datasets() { + return this.lineageDatasets; + } + + @Override + public Boundedness boundedness() { + return this.boundedness; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java index 51b101e47b6..c77fc4d154b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java @@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -69,6 +71,7 @@ public class LegacySourceTransformation<T> extends TransformationWithLineage<T> super(name, outputType, parallelism, parallelismConfigured); this.operatorFactory = checkNotNull(SimpleOperatorFactory.of(operator)); this.boundedness = checkNotNull(boundedness); + this.extractLineageVertex(operator); } /** Mutable for legacy sources in the Table API. */ @@ -105,4 +108,11 @@ public class LegacySourceTransformation<T> extends TransformationWithLineage<T> public final void setChainingStrategy(ChainingStrategy strategy) { operatorFactory.setChainingStrategy(strategy); } + + private void extractLineageVertex(StreamSource<T, ?> operator) { + SourceFunction sourceFunction = operator.getUserFunction(); + if (sourceFunction instanceof LineageVertexProvider) { + setLineageVertex(((LineageVertexProvider) sourceFunction).getLineageVertex()); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java index c5190d4477d..18389746a74 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.api.operators.ChainingStrategy; import javax.annotation.Nullable; @@ -64,6 +65,7 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT> super(name, outputType, parallelism); this.source = source; this.watermarkStrategy = watermarkStrategy; + this.extractLineageVertex(); } public SourceTransformation( @@ -76,6 +78,7 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT> super(name, outputType, parallelism, parallelismConfigured); this.source = source; this.watermarkStrategy = watermarkStrategy; + this.extractLineageVertex(); } public Source<OUT, SplitT, EnumChkT> getSource() { @@ -118,4 +121,10 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT> public String getCoordinatorListeningID() { return coordinatorListeningID; } + + private void extractLineageVertex() { + if (source instanceof LineageVertexProvider) { + setLineageVertex(((LineageVertexProvider) source).getLineageVertex()); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java new file mode 100644 index 00000000000..75898fedee5 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Testing for lineage graph util. */ +class LineageGraphUtilsTest { + private static final String SOURCE_DATASET_NAME = "LineageSource"; + private static final String SOURCE_DATASET_NAMESPACE = "source://LineageSource"; + private static final String SINK_DATASET_NAME = "LineageSink"; + private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink"; + + private static final String LEGACY_SOURCE_DATASET_NAME = "LineageSourceFunction"; + private static final String LEGACY_SOURCE_DATASET_NAMESPACE = "source://LineageSourceFunction"; + private static final String LEGACY_SINK_DATASET_NAME = "LineageSinkFunction"; + private static final String LEGACY_SINK_DATASET_NAMESPACE = "sink://LineageSinkFunction"; + + @Test + void testExtractLineageGraphFromLegacyTransformations() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Long> source = env.addSource(new LineageSourceFunction()); + DataStreamSink<Long> sink = source.addSink(new LineageSinkFunction()); + + LineageGraph lineageGraph = + LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation())); + + assertThat(lineageGraph.sources().size()).isEqualTo(1); + assertThat(lineageGraph.sources().get(0).boundedness()) + .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1); + assertThat(lineageGraph.sources().get(0).datasets().get(0).name()) + .isEqualTo(LEGACY_SOURCE_DATASET_NAME); + assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace()) + .isEqualTo(LEGACY_SOURCE_DATASET_NAMESPACE); + + assertThat(lineageGraph.sinks().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().get(0).datasets().get(0).name()) + .isEqualTo(LEGACY_SINK_DATASET_NAME); + assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace()) + .isEqualTo(LEGACY_SINK_DATASET_NAMESPACE); + + assertThat(lineageGraph.relations().size()).isEqualTo(1); + } + + @Test + void testExtractLineageGraphFromTransformations() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Long> source = + env.fromSource(new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), ""); + DataStreamSink<Long> sink = source.sinkTo(new LineageSink()); + + LineageGraph lineageGraph = + LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation())); + + assertThat(lineageGraph.sources().size()).isEqualTo(1); + assertThat(lineageGraph.sources().get(0).boundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1); + assertThat(lineageGraph.sources().get(0).datasets().get(0).name()) + .isEqualTo(SOURCE_DATASET_NAME); + assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace()) + .isEqualTo(SOURCE_DATASET_NAMESPACE); + + assertThat(lineageGraph.sinks().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().get(0).datasets().get(0).name()) + .isEqualTo(SINK_DATASET_NAME); + assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace()) + .isEqualTo(SINK_DATASET_NAMESPACE); + + assertThat(lineageGraph.relations().size()).isEqualTo(1); + } + + private static class LineageSink extends DiscardingSink<Long> implements LineageVertexProvider { + public LineageSink() { + super(); + } + + @Override + public LineageVertex getLineageVertex() { + LineageDataset lineageDataset = + new DefaultLineageDataset( + SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>()); + DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); + lineageVertex.addLineageDataset(lineageDataset); + return lineageVertex; + } + } + + private static class LineageSource extends NumberSequenceSource + implements LineageVertexProvider { + + public LineageSource(long from, long to) { + super(from, to); + } + + @Override + public LineageVertex getLineageVertex() { + LineageDataset lineageDataset = + new DefaultLineageDataset( + SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>()); + DefaultSourceLineageVertex lineageVertex = + new DefaultSourceLineageVertex(Boundedness.BOUNDED); + lineageVertex.addDataset(lineageDataset); + return lineageVertex; + } + } + + private static class LineageSourceFunction + implements SourceFunction<Long>, LineageVertexProvider { + private volatile boolean running = true; + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + long next = 0; + while (running) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(next++); + } + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public LineageVertex getLineageVertex() { + LineageDataset lineageDataset = + new DefaultLineageDataset( + LEGACY_SOURCE_DATASET_NAME, + LEGACY_SOURCE_DATASET_NAMESPACE, + new HashMap<>()); + DefaultSourceLineageVertex lineageVertex = + new DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED); + lineageVertex.addDataset(lineageDataset); + return lineageVertex; + } + } + + private static class LineageSinkFunction implements SinkFunction<Long>, LineageVertexProvider { + + @Override + public LineageVertex getLineageVertex() { + LineageDataset lineageDataset = + new DefaultLineageDataset( + LEGACY_SINK_DATASET_NAME, + LEGACY_SINK_DATASET_NAMESPACE, + new HashMap<>()); + DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); + lineageVertex.addLineageDataset(lineageDataset); + return lineageVertex; + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java index dd58ef184dc..8530aec01c6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; @@ -36,7 +37,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; +import org.apache.flink.streaming.api.lineage.DefaultLineageVertex; +import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageGraph; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; +import org.apache.flink.streaming.runtime.execution.JobCreatedEvent; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.TestLogger; @@ -51,6 +60,7 @@ import org.junit.runners.MethodSorters; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; @@ -61,6 +71,11 @@ import static org.assertj.core.api.Assertions.assertThat; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class JobStatusChangedListenerITCase extends TestLogger { private static final int PARALLELISM = 4; + private static final String SOURCE_DATASET_NAME = "LineageSource"; + private static final String SOURCE_DATASET_NAMESPACE = "source://LineageSource"; + private static final String SINK_DATASET_NAME = "LineageSink"; + private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink"; + @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); @ClassRule @@ -162,6 +177,7 @@ public class JobStatusChangedListenerITCase extends TestLogger { StreamGraph streamGraph = env.getStreamGraph(); JobGraph jobGraph = streamGraph.getJobGraph(); + verifyLineageGraph(streamGraph.getLineageGraph()); ClusterClient<?> client = MINI_CLUSTER.getClusterClient(); JobID jobID = client.submitJob(jobGraph).get(); waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false); @@ -183,9 +199,35 @@ public class JobStatusChangedListenerITCase extends TestLogger { || (status.oldStatus() == JobStatus.CANCELLING && status.newStatus() == JobStatus.CANCELED)) .isTrue(); + + if (event instanceof JobCreatedEvent) { + LineageGraph lineageGraph = ((JobCreatedEvent) event).lineageGraph(); + assertThat(lineageGraph.sources().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().size()).isEqualTo(1); + } }); } + void verifyLineageGraph(LineageGraph lineageGraph) { + assertThat(lineageGraph.sources().size()).isEqualTo(1); + assertThat(lineageGraph.sources().get(0).boundedness()) + .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1); + assertThat(lineageGraph.sources().get(0).datasets().get(0).name()) + .isEqualTo(SOURCE_DATASET_NAME); + assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace()) + .isEqualTo(SOURCE_DATASET_NAMESPACE); + + assertThat(lineageGraph.sinks().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1); + assertThat(lineageGraph.sinks().get(0).datasets().get(0).name()) + .isEqualTo(SINK_DATASET_NAME); + assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace()) + .isEqualTo(SINK_DATASET_NAMESPACE); + + assertThat(lineageGraph.relations().size()).isEqualTo(1); + } + void verifyEventMetaData() { assertThat(statusChangedEvents.size()).isEqualTo(3); assertThat(statusChangedEvents.get(0).jobId()) @@ -238,7 +280,8 @@ public class JobStatusChangedListenerITCase extends TestLogger { public void cancel() {} } - private static class InfiniteLongSourceFunction implements SourceFunction<Long> { + private static class InfiniteLongSourceFunction + implements SourceFunction<Long>, LineageVertexProvider { private volatile boolean running = true; @Override @@ -255,12 +298,33 @@ public class JobStatusChangedListenerITCase extends TestLogger { public void cancel() { running = false; } + + @Override + public LineageVertex getLineageVertex() { + LineageDataset lineageDataset = + new DefaultLineageDataset( + SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>()); + DefaultSourceLineageVertex lineageVertex = + new DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED); + lineageVertex.addDataset(lineageDataset); + return lineageVertex; + } } - private static class SleepingSink implements SinkFunction<Long> { + private static class SleepingSink implements SinkFunction<Long>, LineageVertexProvider { @Override public void invoke(Long value, Context context) throws Exception { Thread.sleep(1_000); } + + @Override + public LineageVertex getLineageVertex() { + LineageDataset lineageDataset = + new DefaultLineageDataset( + SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>()); + DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); + lineageVertex.addLineageDataset(lineageDataset); + return lineageVertex; + } } }