[ https://issues.apache.org/jira/browse/FLINK-21165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zlzhang0122 updated FLINK-21165: -------------------------------- Description: When using assignTimestampAndWatermarks function separately, if it can satisfied the chaining condition, it will chained erroneously with previous operator. For example, I have a code like bellow: {code:java} // code placeholder DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); ds.assignTimestampAndWatermarks(xxxx); DataStream<UserAction> rsStream = ds.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);{code} I will get the JobGraph bellow. !Lark20210127-203541.png! If I change my code to this: {code:java} // code placeholder DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); DataStream<UserAction> watermarkStream = ds.assignTimestampAndWatermarks(xxxx); DataStream<UserAction> rsStream = watermarkStream.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2); {code} I will get the same JobGraph, at the same time the actual execution result is not the same. I think the first JobGraph have some problem and should not show like that, maybe the Timestamps/Watermarks operator should show separately and should not chaining with previous operator. was: When using assignTimestampAndWatermarks function separately, if it can satisfied the chaining condition, it will chained erroneously with previous operator. For example, I have a code like bellow: {code:java} // code placeholder DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); ds.assignTimestampAndWatermarks(xxxx); DataStream<UserAction> rsStream = ds.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);{code} I will get the JobGraph bellow. !Lark20210127-203541.png! If I change my code to this: {code:java} // code placeholder DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); DataStream<UserAction> watermarkStream = ds.assignTimestampAndWatermarks(xxxx); DataStream<UserAction> rsStream = watermarkStream.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2); {code} I will get the same JobGraph, at the same time the actual execution result is not the same. I think the first JobGraph have some problem and should not show like that, maybe the Timestamps/Watermarks operator should show separately and should not chaining with previous operator. > Timestamps/Watermarks chained erroneously with previous operator > ---------------------------------------------------------------- > > Key: FLINK-21165 > URL: https://issues.apache.org/jira/browse/FLINK-21165 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission > Affects Versions: 1.12.0, 1.11.1 > Reporter: zlzhang0122 > Priority: Major > Attachments: Lark20210127-203541.png > > > When using assignTimestampAndWatermarks function separately, if it can > satisfied the chaining condition, it will chained erroneously with previous > operator. > For example, I have a code like bellow: > {code:java} > // code placeholder > DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); > ds.assignTimestampAndWatermarks(xxxx); > DataStream<UserAction> rsStream = > ds.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);{code} > I will get the JobGraph bellow. > !Lark20210127-203541.png! > If I change my code to this: > {code:java} > // code placeholder > DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); > DataStream<UserAction> watermarkStream = > ds.assignTimestampAndWatermarks(xxxx); > DataStream<UserAction> rsStream = > watermarkStream.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2); > {code} > I will get the same JobGraph, at the same time the actual execution result is > not the same. > I think the first JobGraph have some problem and should not show like that, > maybe the Timestamps/Watermarks operator should show separately and should > not chaining with previous operator. -- This message was sent by Atlassian Jira (v8.3.4#803005)