??????????????????????udf????????????????????????????????????????udf????????????????????????????????????jar??
------------------ ???????? ------------------ ??????: "user-zh" <dian0511...@gmail.com>; ????????: 2021??10??19??(??????) ????10:51 ??????: "user-zh"<user-zh@flink.apache.org>;"xuzh"<huazhe...@foxmail.com>; ????: Re: pyflink 1.14.0 udf ?????????????????????????? ?????????????????????????????????????????? On Mon, Oct 18, 2021 at 6:44 PM xuzh <huazhe...@foxmail.com> wrote: > from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.table.expressions import call, row > > > class HashCode(ScalarFunction): > def __init__(self): > self.factor = 12 > > def eval(self, s): > return hash(s) * self.factor > > > env_settings = EnvironmentSettings.in_batch_mode() > btenv = TableEnvironment.create(env_settings) > > hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) > # ?? SQL API ?????? Python ?????????? > btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT())) > tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], > DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), > DataTypes.FIELD("b", DataTypes.STRING()), > DataTypes.FIELD("c", DataTypes.FLOAT())])) > btenv.create_temporary_view("tb2", tb2) > tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2") > print(tb2.to_pandas()) > > # 3. ???? sink ?? > btenv.execute_sql(""" > CREATE TABLE rs ( > a int, > b string, > m bigint > ) WITH ( > 'connector' = 'print' > ) > """) > > tb2.execute_insert("rs").wait() > print(tb2.to_pandas()) > # > > > > >