------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<dian0511...@gmail.com&gt;;
????????:&nbsp;2021??10??19??(??????) ????10:51
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"xuzh"<huazhe...@foxmail.com&gt;;

????:&nbsp;Re: pyflink 1.14.0 udf ??????????????????????????



??????????????????????????????????????????

On Mon, Oct 18, 2021 at 6:44 PM xuzh <huazhe...@foxmail.com&gt; wrote:

&gt; from pyflink.table import ScalarFunction, EnvironmentSettings, 
TableEnvironment, DataTypes
&gt; from pyflink.table.udf import udf
&gt; from pyflink.table.expressions import call, row
&gt;
&gt;
&gt; class HashCode(ScalarFunction):
&gt;&nbsp;&nbsp;&nbsp;&nbsp; def __init__(self):
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.factor = 12
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; def eval(self, s):
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return hash(s) * 
self.factor
&gt;
&gt;
&gt; env_settings = EnvironmentSettings.in_batch_mode()
&gt; btenv = TableEnvironment.create(env_settings)
&gt;
&gt; hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
&gt; # ?? SQL API ?????? Python ??????????
&gt; btenv.create_temporary_function("hash_code", udf(HashCode(), 
result_type=DataTypes.BIGINT()))
&gt; tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 DataTypes.FIELD("b", DataTypes.STRING()),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 DataTypes.FIELD("c", DataTypes.FLOAT())]))
&gt; btenv.create_temporary_view("tb2", tb2)
&gt; tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
&gt; print(tb2.to_pandas())
&gt;
&gt; # 3. ???? sink ??
&gt; btenv.execute_sql("""
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE rs (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a int,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; b string,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; m bigint
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
'connector' = 'print'
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&nbsp;&nbsp;&nbsp; """)
&gt;
&gt; tb2.execute_insert("rs").wait()
&gt; print(tb2.to_pandas())
&gt; #
&gt;
&gt;
&gt;
&gt;
&gt;

回复