eventInfo_eventTime 我猜测是 BIGINT 类型的吧? order by | range 需要用到 timestamp 类型,需要用计算列转换一下
At 2021-12-24 16:38:00, "Pinjie Huang" <pinjie.hu...@afterpay.com.INVALID> wrote: >我的原SQL: >CREATE TABLE consumer_session_created >( >consumer ROW (consumerUuid STRING), >clientIp STRING, >deviceId STRING, >eventInfo ROW < eventTime BIGINT >, >ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, 'yyyy-MM-dd >HH:mm:ss')), >WATERMARK FOR ts AS ts - INTERVAL '5' SECOND >) WITH ( 'connector'='kafka' >,'topic'='local.dwh.paylater.consumer.session.consumer-session-created.v1' >,'properties.bootstrap.servers'='http://localhost:9092' ,' >properties.group.id'='flink-ato-trusted-consumer' >,'scan.startup.mode'='latest-offset' >,'properties.allow.auto.create.topics'='false' ,'format'='avro-confluent' >,'avro-confluent.basic-auth.credentials-source'='null' >,'avro-confluent.basic-auth.user-info'='null' >,'avro-confluent.schema-registry.url'='http://localhost:8081' >,'avro-confluent.schema-registry.subject'='local.dwh.paylater.consumer.session.consumer-session-created.v1') > >CREATE >TEMPORARY VIEW consumer_session_created_detail as ( >SELECT >csc.consumer.consumerUuid as consumer_consumerUuid, >csc.deviceId as deviceId, >csc.clientIp as clientIp, >csc.eventInfo.eventTime as eventInfo_eventTime >FROM consumer_session_created csc >) > >SELECT >consumer_consumerUuid AS entity_id, >COUNT(DISTINCT deviceId) OVER w AS >sp_c_distinct_device_cnt_by_consumer_id_h1_0, >COUNT (DISTINCT clientIp) OVER w AS sp_d_distinct_ip_cnt_by_consumer_id_h1_0 >FROM consumer_session_created_detail >WINDOW w AS ( >PARTITION BY consumer_consumerUuid >ORDER BY eventInfo_eventTime >RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW >) > >报的错: > >org.apache.flink.client.program.ProgramInvocationException: The main method >caused an error: org.apache.flink.table.api.ValidationException: SQL >validation failed. From line 9, column 15 to line 9, column 31: Data Type >mismatch between ORDER BY and RANGE clause > >at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) > >at >org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) > >at >org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:207) > >at >org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > >at >org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:715) > >at aptflinkjobs.stream.SQLStreamer.lambda$execute$1(SQLStreamer.java:149) > >at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > >at aptflinkjobs.stream.SQLStreamer.execute(SQLStreamer.java:141) > >at aptflinkjobs.stream.SQLStreamer.main(SQLStreamer.java:296) > >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >at >sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >at >sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >at java.lang.reflect.Method.invoke(Method.java:498) > >at >org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) > >at >org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)