eventInfo_eventTime 我猜测是 BIGINT 类型的吧?
order by | range 需要用到 timestamp 类型,需要用计算列转换一下

















At 2021-12-24 16:38:00, "Pinjie Huang" <pinjie.hu...@afterpay.com.INVALID> 
wrote:
>我的原SQL:
>CREATE TABLE consumer_session_created
>(
>consumer ROW (consumerUuid STRING),
>clientIp STRING,
>deviceId STRING,
>eventInfo ROW < eventTime BIGINT >,
>ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, 'yyyy-MM-dd
>HH:mm:ss')),
>WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>) WITH ( 'connector'='kafka'
>,'topic'='local.dwh.paylater.consumer.session.consumer-session-created.v1'
>,'properties.bootstrap.servers'='http://localhost:9092' ,'
>properties.group.id'='flink-ato-trusted-consumer'
>,'scan.startup.mode'='latest-offset'
>,'properties.allow.auto.create.topics'='false' ,'format'='avro-confluent'
>,'avro-confluent.basic-auth.credentials-source'='null'
>,'avro-confluent.basic-auth.user-info'='null'
>,'avro-confluent.schema-registry.url'='http://localhost:8081'
>,'avro-confluent.schema-registry.subject'='local.dwh.paylater.consumer.session.consumer-session-created.v1')
>
>CREATE
>TEMPORARY VIEW consumer_session_created_detail as (
>SELECT
>csc.consumer.consumerUuid as consumer_consumerUuid,
>csc.deviceId as deviceId,
>csc.clientIp as clientIp,
>csc.eventInfo.eventTime as eventInfo_eventTime
>FROM consumer_session_created csc
>)
>
>SELECT
>consumer_consumerUuid AS entity_id,
>COUNT(DISTINCT deviceId) OVER w AS
>sp_c_distinct_device_cnt_by_consumer_id_h1_0,
>COUNT (DISTINCT clientIp) OVER w AS sp_d_distinct_ip_cnt_by_consumer_id_h1_0
>FROM consumer_session_created_detail
>WINDOW w AS (
>PARTITION BY consumer_consumerUuid
>ORDER BY eventInfo_eventTime
>RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
>)
>
>报的错:
>
>org.apache.flink.client.program.ProgramInvocationException: The main method
>caused an error: org.apache.flink.table.api.ValidationException: SQL
>validation failed. From line 9, column 15 to line 9, column 31: Data Type
>mismatch between ORDER BY and RANGE clause
>
>at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>
>at
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>
>at
>org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:207)
>
>at
>org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:715)
>
>at aptflinkjobs.stream.SQLStreamer.lambda$execute$1(SQLStreamer.java:149)
>
>at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>
>at aptflinkjobs.stream.SQLStreamer.execute(SQLStreamer.java:141)
>
>at aptflinkjobs.stream.SQLStreamer.main(SQLStreamer.java:296)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>at
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.java:498)
>
>at
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
>
>at
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)

回复