Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

2021-10-20 文章 Dian Fu
图挂了,邮件列表不能直接发图片。可以发一下更详细的日志信息吗?

On Tue, Oct 19, 2021 at 6:34 PM xuzh  wrote:

> 错误日志
> Exception in thread Thread-14:
> Traceback (most recent call last):
>  File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in
> _bootstrap_inner
>   self.run()
>  File
> "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 218, in run
>   while not self._finished.wait(next_call - time.time()):
>  File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
>   signaled = self._cond.wait(timeout)
>  File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
>   gotit = waiter.acquire(True, timeout)
> OverflowError: timeout value is too large


?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh

Exception in thread Thread-14:
Traceback (most recent call last):
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in 
_bootstrap_inner
  self.run()
 File 
"D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
 line 218, in run
  while not self._finished.wait(next_call - time.time()):
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
  signaled = self._cond.wait(timeout)
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
  gotit = waiter.acquire(True, timeout)
OverflowError: timeout value is too large

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
----
??: 
   "user-zh"



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
??udfudfjar??














--  --
??: 
   "user-zh"



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 文章 xuzh
??udfudfjar??













----
??: 
   "user-zh"



Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

2021-10-18 文章 Dian Fu
我试了一下是可以运行的,可以发一下报错吗?

On Mon, Oct 18, 2021 at 6:44 PM xuzh  wrote:

> from pyflink.table import ScalarFunction, EnvironmentSettings, 
> TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), 
> result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
>   DataTypes.ROW([DataTypes.FIELD("a", 
> DataTypes.INT()),
>  DataTypes.FIELD("b", 
> DataTypes.STRING()),
>  DataTypes.FIELD("c", 
> DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
>CREATE TABLE rs (
>a int,
>b string,
>m bigint
>) WITH (
>'connector' = 'print'
>)
>""")
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>


pyflink 1.14.0 udf ??????????????????????????

2021-10-18 文章 xuzh
from pyflink.table import ScalarFunction, EnvironmentSettings, 
TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row


class HashCode(ScalarFunction):
def __init__(self):
self.factor = 12

def eval(self, s):
return hash(s) * self.factor


env_settings = EnvironmentSettings.in_batch_mode()
btenv = TableEnvironment.create(env_settings)

hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# ?? SQL API ?? Python ??
btenv.create_temporary_function("hash_code", udf(HashCode(), 
result_type=DataTypes.BIGINT()))
tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
  DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
 DataTypes.FIELD("b", 
DataTypes.STRING()),
 DataTypes.FIELD("c", 
DataTypes.FLOAT())]))
btenv.create_temporary_view("tb2", tb2)
tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
print(tb2.to_pandas())

# 3.  sink ??
btenv.execute_sql("""
   CREATE TABLE rs (
   a int,
   b string,
   m bigint
   ) WITH (
   'connector' = 'print'
   )
   """)

tb2.execute_insert("rs").wait()
print(tb2.to_pandas())
#