[ https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327278#comment-16327278 ]
ASF GitHub Bot commented on FLINK-6116: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4649#discussion_r161791264 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID, StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); + Tuple2<Integer, Integer> edgePair = new Tuple2<>(upstreamNode.getId(), downstreamNode.getId()); + if (!uniqueEdgeMap.containsKey(edgePair)) { --- End diff -- ``` uniqueEdgeMap.put(edgePair, uniqueEdgeMap.getOrDefault(edgePair, 0) + 1) ``` > Watermarks don't work when unioning with same DataStream > -------------------------------------------------------- > > Key: FLINK-6116 > URL: https://issues.apache.org/jira/browse/FLINK-6116 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.3.0 > Reporter: Aljoscha Krettek > Priority: Critical > > In this example job we don't get any watermarks in the {{WatermarkObserver}}: > {code} > public class WatermarkTest { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.getConfig().setAutoWatermarkInterval(1000); > env.setParallelism(1); > DataStreamSource<String> input = env.addSource(new > SourceFunction<String>() { > @Override > public void run(SourceContext<String> ctx) throws > Exception { > while (true) { > ctx.collect("hello!"); > Thread.sleep(800); > } > } > @Override > public void cancel() { > } > }); > input.union(input) > .flatMap(new IdentityFlatMap()) > .transform("WatermarkOp", > BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver()); > env.execute(); > } > public static class WatermarkObserver > extends AbstractStreamOperator<String> > implements OneInputStreamOperator<String, String> { > @Override > public void processElement(StreamRecord<String> element) throws > Exception { > System.out.println("GOT ELEMENT: " + element); > } > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("GOT WATERMARK: " + mark); > } > } > private static class IdentityFlatMap > extends RichFlatMapFunction<String, String> { > @Override > public void flatMap(String value, Collector<String> out) throws > Exception { > out.collect(value); > } > } > } > {code} > When commenting out the `union` it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)