环境: flink 1.10,使用flinkSQL kafka输入数据如: {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0} {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
sql如下: INSERT INTO topic_sink SELECT t, id, speed, LAG(speed, 1) OVER w AS speed_1, LAG(speed, 2) OVER w AS speed_2 FROM topic_source WINDOW w AS ( PARTITION BY id ORDER BY t ) 我期望得到的结果数据是 {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0} {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0} 实际得到的结果数据是: {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0} {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0} 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写? -- Sent from: http://apache-flink.147419.n8.nabble.com/