Hi, 我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。 我把我的测试代码放到附件里面了, 你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下
Best, Xingbo 秦寒 <han...@chinaums.com> 于2020年4月15日周三 下午3:16写道: > 你好 > > 我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink > 文件里面没有任何数据, > > 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下 > > > > *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}* > > > > > > > > > > *测试结果* > > *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}* > > > > > > > > st_env.from_path("source")\ > .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \ > *.select("add(b1,c1)") \ **无任何输出* > .insert_into("result_tab") > > *无任何输出* > > > > > > st_env.from_path("source")\ > .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \ > *.select("c1")\* #正常输出 > > > .insert_into("result_tab") > > *正确输出* > >
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic import os from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, FileSystem, OldCsv from pyflink.table.udf import udf def test_udf(): s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) s_env.set_parallelism(1) st_env = StreamTableEnvironment.create(s_env) result_file = "/tmp/scalar_func_basic.csv" if os.path.exists(result_file): os.remove(result_file) st_env.register_table_sink("Results", CsvTableSink(['a'], [DataTypes.BIGINT()], result_file)) st_env.register_function("add", udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())) st_env \ .connect( # declare the external system to connect to Kafka() .version("0.11") .topic("user") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) \ .with_format( # declare a format for this system Json() .fail_on_missing_field(True) .json_schema( "{" " type: 'object'," " properties: {" " a: {" " type: 'string'" " }," " b: {" " type: 'string'" " }," " c: {" " type: 'string'" " }" " }" "}" ) ) \ .with_schema( # declare the schema of the table Schema() .field("a", DataTypes.STRING()) .field("b", DataTypes.STRING()) .field("c", DataTypes.STRING()) ) \ .in_append_mode() \ .register_table_source("source") st_env.from_path("source") \ .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \ .select("add(b1, c1)") \ .insert_into("Results") st_env.execute("test") if __name__ == '__main__': test_udf()