使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], result_type=DataTypes.ROW( [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), DataTypes.FIELD('aveBuy', DataTypes.INT())), func_type='pandas') def orderCalc(code, amount): df = pd.DataFrame({'code': code, 'amount': amount}) # pandas 数据处理后输入另一个dataframe output return (output['buyQtl'], output['aveBuy']) 定义了csv的sink如下 create table csvSink ( buyQtl BIGINT, aveBuy INT ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'e:/output' ) 然后进行如下的操作: result_table = t_env.sql_query(""" select orderCalc(code, amount) from `some_source` group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount """) result_table.execute_insert("csvSink") 在执行程序的时候提示没法入库 py4j.protocol.Py4JJavaError: An error occurred while calling o98.executeInsert. : org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.csvSink' do not match. Cause: Different number of columns. Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] Sink schema: [buyQtl: BIGINT, aveBuy: INT] at org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx ception(DynamicSinkUtils.java:304) at org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply ImplicitCast(DynamicSinkUtils.java:134) 是UDF的输出结构不对吗,还是需要调整sink table的结构?