Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗? On Tue, Oct 19, 2021 at 6:34 PM xuzh wrote: > 错误日志 > Exception in thread Thread-14: > Traceback (most recent call last): > File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in > _bootstrap_inner > self.run() > File > "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", > line 218, in run > while not self._finished.wait(next_call - time.time()): > File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait > signaled = self._cond.wait(timeout) > File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait > gotit = waiter.acquire(True, timeout) > OverflowError: timeout value is too large
?????? pyflink 1.14.0 udf ??????????????????????????
Exception in thread Thread-14: Traceback (most recent call last): File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in _bootstrap_inner self.run() File "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", line 218, in run while not self._finished.wait(next_call - time.time()): File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait signaled = self._cond.wait(timeout) File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait gotit = waiter.acquire(True, timeout) OverflowError: timeout value is too large
?????? pyflink 1.14.0 udf ??????????????????????????
---- ??: "user-zh"
?????? pyflink 1.14.0 udf ??????????????????????????
??udfudfjar?? -- -- ??: "user-zh"
?????? pyflink 1.14.0 udf ??????????????????????????
??udfudfjar?? ---- ??: "user-zh"
Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
我试了一下是可以运行的,可以发一下报错吗? On Mon, Oct 18, 2021 at 6:44 PM xuzh wrote: > from pyflink.table import ScalarFunction, EnvironmentSettings, > TableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.table.expressions import call, row > > > class HashCode(ScalarFunction): > def __init__(self): > self.factor = 12 > > def eval(self, s): > return hash(s) * self.factor > > > env_settings = EnvironmentSettings.in_batch_mode() > btenv = TableEnvironment.create(env_settings) > > hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) > # 在 SQL API 中使用 Python 自定义函数 > btenv.create_temporary_function("hash_code", udf(HashCode(), > result_type=DataTypes.BIGINT())) > tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], > DataTypes.ROW([DataTypes.FIELD("a", > DataTypes.INT()), > DataTypes.FIELD("b", > DataTypes.STRING()), > DataTypes.FIELD("c", > DataTypes.FLOAT())])) > btenv.create_temporary_view("tb2", tb2) > tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2") > print(tb2.to_pandas()) > > # 3. 创建 sink 表 > btenv.execute_sql(""" >CREATE TABLE rs ( >a int, >b string, >m bigint >) WITH ( >'connector' = 'print' >) >""") > > tb2.execute_insert("rs").wait() > print(tb2.to_pandas()) > # > > > > >
pyflink 1.14.0 udf ??????????????????????????
from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.table.expressions import call, row class HashCode(ScalarFunction): def __init__(self): self.factor = 12 def eval(self, s): return hash(s) * self.factor env_settings = EnvironmentSettings.in_batch_mode() btenv = TableEnvironment.create(env_settings) hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) # ?? SQL API ?? Python ?? btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT())) tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", DataTypes.FLOAT())])) btenv.create_temporary_view("tb2", tb2) tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2") print(tb2.to_pandas()) # 3. sink ?? btenv.execute_sql(""" CREATE TABLE rs ( a int, b string, m bigint ) WITH ( 'connector' = 'print' ) """) tb2.execute_insert("rs").wait() print(tb2.to_pandas()) #