大家好,请教个问题:
我在pyflink中使用SQL DDL创建kafka源,如下:
    kafka_source_ddl = """
    CREATE TABLE kafka_source_tb (
     name VARCHAR,
     number INT,
     msgtime TIMESTAMP,
     WATERMARK FOR msgtime AS msgtime
    ) WITH (
     'connector.type' = 'kafka',
     'connector.version' = 'universal',
     'connector.topic' = 'mytopic',
     'connector.properties.zookeeper.connect' = 'localhost:2181',
     'connector.properties.bootstrap.servers' = 'localhost:9092',
     'format.type' = 'json',
     'format.derive-schema' = 'true'
    )
    """
    st_env.sql_update(kafka_source_ddl)


在使用窗口时报错,代码如下:
    st_env.from_path("kafka_source_tb") \
          
.window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
 \
          .group_by("msgtime") \
          .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime as d") 
\


报错如下
: org.apache.flink.table.api.ValidationException: A group window expects a time 
attribute for grouping in a stream environment.
        at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
        at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
        at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
        at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
        at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
        at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
        at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)


我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?


请大家帮忙看一下
谢谢!

回复