Aljoscha Krettek created FLINK-6116:
---------------------------------------
Summary: Watermarks don't work when unioning with same DataStream
Key: FLINK-6116
URL: https://issues.apache.org/jira/browse/FLINK-6116
Project: Flink
Issue Type: Bug
Components: DataStream API
Affects Versions: 1.2.0, 1.3.0
Reporter: Aljoscha Krettek
In this example job we don't get any watermarks in the {{WatermarkObserver}}:
{code}
public class WatermarkTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.setParallelism(1);
DataStreamSource<String> input = env.addSource(new
SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws
Exception {
while (true) {
ctx.collect("hello!");
Thread.sleep(800);
}
}
@Override
public void cancel() {
}
});
input.union(input)
.flatMap(new IdentityFlatMap())
.transform("WatermarkOp",
BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
env.execute();
}
public static class WatermarkObserver
extends AbstractStreamOperator<String>
implements OneInputStreamOperator<String, String> {
@Override
public void processElement(StreamRecord<String> element) throws
Exception {
System.out.println("GOT ELEMENT: " + element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
System.out.println("GOT WATERMARK: " + mark);
}
}
private static class IdentityFlatMap
extends RichFlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws
Exception {
out.collect(value);
}
}
}
{code}
When commenting out the `union` it works.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)