我的原SQL: CREATE TABLE consumer_session_created ( consumer ROW (consumerUuid STRING), clientIp STRING, deviceId STRING, eventInfo ROW < eventTime BIGINT >, ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, 'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='kafka' ,'topic'='local.dwh.paylater.consumer.session.consumer-session-created.v1' ,'properties.bootstrap.servers'='http://localhost:9092' ,' properties.group.id'='flink-ato-trusted-consumer' ,'scan.startup.mode'='latest-offset' ,'properties.allow.auto.create.topics'='false' ,'format'='avro-confluent' ,'avro-confluent.basic-auth.credentials-source'='null' ,'avro-confluent.basic-auth.user-info'='null' ,'avro-confluent.schema-registry.url'='http://localhost:8081' ,'avro-confluent.schema-registry.subject'='local.dwh.paylater.consumer.session.consumer-session-created.v1')
CREATE TEMPORARY VIEW consumer_session_created_detail as ( SELECT csc.consumer.consumerUuid as consumer_consumerUuid, csc.deviceId as deviceId, csc.clientIp as clientIp, csc.eventInfo.eventTime as eventInfo_eventTime FROM consumer_session_created csc ) SELECT consumer_consumerUuid AS entity_id, COUNT(DISTINCT deviceId) OVER w AS sp_c_distinct_device_cnt_by_consumer_id_h1_0, COUNT (DISTINCT clientIp) OVER w AS sp_d_distinct_ip_cnt_by_consumer_id_h1_0 FROM consumer_session_created_detail WINDOW w AS ( PARTITION BY consumer_consumerUuid ORDER BY eventInfo_eventTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) 报的错: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 9, column 15 to line 9, column 31: Data Type mismatch between ORDER BY and RANGE clause at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:207) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:715) at aptflinkjobs.stream.SQLStreamer.lambda$execute$1(SQLStreamer.java:149) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at aptflinkjobs.stream.SQLStreamer.execute(SQLStreamer.java:141) at aptflinkjobs.stream.SQLStreamer.main(SQLStreamer.java:296) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)