[ https://issues.apache.org/jira/browse/FLINK-23177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Spongebob updated FLINK-23177: ------------------------------ Description: I have assigned watermark in dataStream and then use the `createTemporaryView` method to build a table that is source from the dataStream. Out of my expectation, the watermarks works normally in dataStream but the watermarks of the table stay at -9223372036854775808 forever. {code:java} def main(args: Array[String]): Unit = { val streamEnv = ... streamEnv.enableCheckpointing(1000) streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) val dataStream = ... val resultStream = dataStream.map( value => { val data = value.split(",") (data(0), data(1).toInt) } ).assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] { override def extractTimestamp(element: (String, Int), recordTimestamp: Long): Long = element._2 * 1000 })) .process(new MyProcessFunc) // resultStream.print("raw") // streamEnv.execute("") val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) streamTableEnv.createTemporaryView("catalog_test1", resultStream) ... } {code} was: I have assigned watermark in dataStream and then use the `createTemporaryView` method to build a table that is source from the dataStream. Out of my expectation, the watermarks works normally in dataStream but the watermarks of the table stay at -9223372036854775808 forever. {code:java} def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.enableCheckpointing(1000) streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) val dataStream = streamEnv.socketTextStream("192.168.164.105", 9999) val resultStream = dataStream.map( value => { val data = value.split(",") (data(0), data(1).toInt) } ).assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] { override def extractTimestamp(element: (String, Int), recordTimestamp: Long): Long = element._2 * 1000 })) .process(new MyProcessFunc) // resultStream.print("raw") // streamEnv.execute("") val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) streamTableEnv.createTemporaryView("catalog_test1", resultStream) val catalog = buildHiveCatalog streamTableEnv.registerCatalog("hive", catalog) streamTableEnv.useCatalog("hive") streamTableEnv.executeSql("insert into test1 select _1,_2 from default_catalog.default_database.catalog_test1").print() } {code} > watermarks generated in dateStream can not flow into table > ---------------------------------------------------------- > > Key: FLINK-23177 > URL: https://issues.apache.org/jira/browse/FLINK-23177 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.13.1 > Environment: flink: 1.13.1 > Reporter: Spongebob > Priority: Major > > I have assigned watermark in dataStream and then use the > `createTemporaryView` method to build a table that is source from the > dataStream. Out of my expectation, the watermarks works normally in > dataStream but the watermarks of the table stay at -9223372036854775808 > forever. > {code:java} > def main(args: Array[String]): Unit = { > val streamEnv = ... > streamEnv.enableCheckpointing(1000) > streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000)) > val dataStream = ... > val resultStream = dataStream.map( > value => { > val data = value.split(",") > (data(0), data(1).toInt) > } > ).assignTimestampsAndWatermarks(WatermarkStrategy > .forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] > { > override def extractTimestamp(element: (String, Int), > recordTimestamp: Long): Long = element._2 * 1000 > })) > .process(new MyProcessFunc) > // resultStream.print("raw") > // streamEnv.execute("") > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > streamTableEnv.createTemporaryView("catalog_test1", resultStream) > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)