Hi Lucas, 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
你可以尝试将sql语句改成以下形式: select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1) from `some_source` group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf” Best, Wei > 在 2020年12月13日,13:13,Lucas <guoliubi...@foxmail.com> 写道: > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下 > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], > result_type=DataTypes.ROW( > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), > DataTypes.FIELD('aveBuy', DataTypes.INT())), > func_type='pandas') > def orderCalc(code, amount): > > df = pd.DataFrame({'code': code, 'amount': amount}) > # pandas 数据处理后输入另一个dataframe output > return (output['buyQtl'], output['aveBuy']) > > > 定义了csv的sink如下 > > create table csvSink ( > buyQtl BIGINT, > aveBuy INT > ) with ( > 'connector.type' = 'filesystem', > 'format.type' = 'csv', > 'connector.path' = 'e:/output' > ) > > > > 然后进行如下的操作: > > result_table = t_env.sql_query(""" > select orderCalc(code, amount) > from `some_source` > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount > """) > result_table.execute_insert("csvSink") > > > > 在执行程序的时候提示没法入库 > > py4j.protocol.Py4JJavaError: An error occurred while calling > o98.executeInsert. > > : org.apache.flink.table.api.ValidationException: Column types of query > result and sink for registered table > 'default_catalog.default_database.csvSink' do not match. > > Cause: Different number of columns. > > > > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] > > Sink schema: [buyQtl: BIGINT, aveBuy: INT] > > at > org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx > ception(DynamicSinkUtils.java:304) > > at > org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply > ImplicitCast(DynamicSinkUtils.java:134) > > > > 是UDF的输出结构不对吗,还是需要调整sink table的结构? >