Aljoscha Krettek created FLINK-6116:
---------------------------------------

             Summary: Watermarks don't work when unioning with same DataStream
                 Key: FLINK-6116
                 URL: https://issues.apache.org/jira/browse/FLINK-6116
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.2.0, 1.3.0
            Reporter: Aljoscha Krettek


In this example job we don't get any watermarks in the {{WatermarkObserver}}:
{code}
public class WatermarkTest {
        public static void main(String[] args) throws Exception {

                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                env.getConfig().setAutoWatermarkInterval(1000);
                env.setParallelism(1);

                DataStreamSource<String> input = env.addSource(new 
SourceFunction<String>() {
                        @Override
                        public void run(SourceContext<String> ctx) throws 
Exception {
                                while (true) {
                                        ctx.collect("hello!");
                                        Thread.sleep(800);
                                }
                        }

                        @Override
                        public void cancel() {

                        }
                });

                input.union(input)
                                .flatMap(new IdentityFlatMap())
                                .transform("WatermarkOp", 
BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());

                env.execute();
        }

        public static class WatermarkObserver
                        extends AbstractStreamOperator<String>
                        implements OneInputStreamOperator<String, String> {
                @Override
                public void processElement(StreamRecord<String> element) throws 
Exception {
                        System.out.println("GOT ELEMENT: " + element);
                }

                @Override
                public void processWatermark(Watermark mark) throws Exception {
                        super.processWatermark(mark);

                        System.out.println("GOT WATERMARK: " + mark);
                }
        }

        private static class IdentityFlatMap
                        extends RichFlatMapFunction<String, String> {
                @Override
                public void flatMap(String value, Collector<String> out) throws 
Exception {
                        out.collect(value);
                }
        }
}
{code}

When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to