各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group
方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?
错误信息:
py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time
attribute for grouping in a stream environment.
at
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at
org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at
org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
demo程序:
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.descriptors import *
from pyflink.table.descriptors import Json
from pyflink.table.window import *
test_out_put_data_path = r'D:\test_doc\test_result_data.csv'
s_nev = StreamExecutionEnvironment.get_execution_environment()
s_nev.set_parallelism(3)
st_nev = StreamTableEnvironment.create(s_nev,
environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092,
cdh5:9092")) \
.with_format(Json()
.fail_on_missing_field(False)
.schema(DataTypes.ROW([DataTypes.FIELD('time',
DataTypes.TIMESTAMP(3)),
DataTypes.FIELD('prev_page',DataTypes.STRING()),
DataTypes.FIELD('page',
DataTypes.STRING()),
DataTypes.FIELD("app",
DataTypes.STRING()),
DataTypes.FIELD("nextApp",DataTypes.STRING()),
DataTypes.FIELD("service",DataTypes.STRING()),
DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
.with_schema(Schema().
field('prev_page', DataTypes.STRING())
.field('page', DataTypes.STRING())
.field('app', DataTypes.STRING())
.field('nextApp', DataTypes.STRING())
.field('service', DataTypes.STRING())
.field('userId', DataTypes.BIGINT())
.field('time', DataTypes.TIMESTAMP(3))
.rowtime(Rowtime()
.timestamps_from_field('time')
.watermarks_periodic_bounded(6)))\
.in_append_mode()\
.create_temporary_table('raw_web_log_data')
st_nev.connect(FileSystem().path(test_out_put_data_path))\
.with_format(OldCsv()
.field_delimiter(',')
.field("userId", DataTypes.BIGINT())
.field('dataCount', DataTypes.BIGINT())
.field('count_time', DataTypes.TIMESTAMP(3))
)\
.with_schema(Schema()
.field('userId', DataTypes.BIGINT())
.field('dataCount', DataTypes.BIGINT())
.field('count_time', DataTypes.TIMESTAMP(3))
)\
.create_temporary_table('test_out_put')
if __name__ == '__main__':
st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
w').select('userId, page.count as d, w.end').execute_insert('test_out_put')