Hi,
我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。
我把我的测试代码放到附件里面了,
你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下

Best,
Xingbo


秦寒 <han...@chinaums.com> 于2020年4月15日周三 下午3:16写道:

> 你好
>
>        我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink
> 文件里面没有任何数据,
>
> 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下
>
>
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
>
>
> *测试结果*
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
> st_env.from_path("source")\
>     .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
>     *.select("add(b1,c1)") \ **无任何输出*
>     .insert_into("result_tab")
>
> *无任何输出*
>
>
>
>
>
> st_env.from_path("source")\
>     .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
>     *.select("c1")\*   #正常输出
>
>
>     .insert_into("result_tab")
>
> *正确输出*
>
>
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
import os
from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, FileSystem, OldCsv
from pyflink.table.udf import udf


def test_udf():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)
    st_env = StreamTableEnvironment.create(s_env)

    result_file = "/tmp/scalar_func_basic.csv"
    if os.path.exists(result_file):
        os.remove(result_file)

    st_env.register_table_sink("Results",
                               CsvTableSink(['a'],
                                            [DataTypes.BIGINT()],
                                            result_file))

    st_env.register_function("add",
                             udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
                                 DataTypes.BIGINT()))
    st_env \
        .connect(  # declare the external system to connect to
        Kafka()
            .version("0.11")
            .topic("user")
            .start_from_earliest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")
    ) \
        .with_format(  # declare a format for this system
        Json()
            .fail_on_missing_field(True)
            .json_schema(
            "{"
            "  type: 'object',"
            "  properties: {"
            "    a: {"
            "      type: 'string'"
            "    },"
            "    b: {"
            "      type: 'string'"
            "    },"
            "    c: {"
            "      type: 'string'"
            "    }"
            "  }"
            "}"
        )
    ) \
        .with_schema(  # declare the schema of the table
        Schema()
            .field("a", DataTypes.STRING())
            .field("b", DataTypes.STRING())
            .field("c", DataTypes.STRING())
    ) \
        .in_append_mode() \
        .register_table_source("source")

    st_env.from_path("source") \
        .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
        .select("add(b1, c1)") \
        .insert_into("Results")
    st_env.execute("test")


if __name__ == '__main__':
    test_udf()

回复