[ 
https://issues.apache.org/jira/browse/FLINK-23177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-23177:
------------------------------
    Description: 
I have assigned watermark in dataStream and then use the `createTemporaryView` 
method to build a table that is source from the dataStream. Out of my 
expectation, the watermarks works normally in dataStream but the watermarks of 
the table stay at -9223372036854775808 forever.
{code:java}
def main(args: Array[String]): Unit = {
    val streamEnv = ...
    streamEnv.enableCheckpointing(1000)
    streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
    val dataStream = ...

    val resultStream = dataStream.map(
      value => {
        val data = value.split(",")
        (data(0), data(1).toInt)
      }
    ).assignTimestampsAndWatermarks(WatermarkStrategy
      .forBoundedOutOfOrderness(Duration.ZERO)
      .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
        override def extractTimestamp(element: (String, Int), recordTimestamp: 
Long): Long = element._2 * 1000
      }))
      .process(new MyProcessFunc)

//    resultStream.print("raw")
//    streamEnv.execute("")

    val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
    streamTableEnv.createTemporaryView("catalog_test1", resultStream)
    ...
  }
{code}

  was:
I have assigned watermark in dataStream and then use the `createTemporaryView` 
method to build a table that is source from the dataStream. Out of my 
expectation, the watermarks works normally in dataStream but the watermarks of 
the table stay at -9223372036854775808 forever.
{code:java}
def main(args: Array[String]): Unit = {
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.enableCheckpointing(1000)
    streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
    val dataStream = streamEnv.socketTextStream("192.168.164.105", 9999)

    val resultStream = dataStream.map(
      value => {
        val data = value.split(",")
        (data(0), data(1).toInt)
      }
    ).assignTimestampsAndWatermarks(WatermarkStrategy
      .forBoundedOutOfOrderness(Duration.ZERO)
      .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
        override def extractTimestamp(element: (String, Int), recordTimestamp: 
Long): Long = element._2 * 1000
      }))
      .process(new MyProcessFunc)

//    resultStream.print("raw")
//    streamEnv.execute("")

    val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
    streamTableEnv.createTemporaryView("catalog_test1", resultStream)
    val catalog = buildHiveCatalog
    streamTableEnv.registerCatalog("hive", catalog)
    streamTableEnv.useCatalog("hive")
    streamTableEnv.executeSql("insert into test1 select _1,_2 from 
default_catalog.default_database.catalog_test1").print()
  }
{code}


> watermarks generated in dateStream can not flow into table
> ----------------------------------------------------------
>
>                 Key: FLINK-23177
>                 URL: https://issues.apache.org/jira/browse/FLINK-23177
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.13.1
>         Environment: flink: 1.13.1
>            Reporter: Spongebob
>            Priority: Major
>
> I have assigned watermark in dataStream and then use the 
> `createTemporaryView` method to build a table that is source from the 
> dataStream. Out of my expectation, the watermarks works normally in 
> dataStream but the watermarks of the table stay at -9223372036854775808 
> forever.
> {code:java}
> def main(args: Array[String]): Unit = {
>     val streamEnv = ...
>     streamEnv.enableCheckpointing(1000)
>     streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 1000))
>     val dataStream = ...
>     val resultStream = dataStream.map(
>       value => {
>         val data = value.split(",")
>         (data(0), data(1).toInt)
>       }
>     ).assignTimestampsAndWatermarks(WatermarkStrategy
>       .forBoundedOutOfOrderness(Duration.ZERO)
>       .withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] 
> {
>         override def extractTimestamp(element: (String, Int), 
> recordTimestamp: Long): Long = element._2 * 1000
>       }))
>       .process(new MyProcessFunc)
> //    resultStream.print("raw")
> //    streamEnv.execute("")
>     val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
>     streamTableEnv.createTemporaryView("catalog_test1", resultStream)
>     ...
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to