Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-18 文章 Dian Fu
这个问题是一个bug, 我创建了一个JIRA:https://issues.apache.org/jira/browse/FLINK-19675 出现的条件:在一个Calc里同时有Python UDF、Where条件、复合列访问。 在没有修复之前, 可以这样work around一下: tmp_table = st_env.from_path("source")\ .select("kubernetes.get('container').get('name') as

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
我摘取了plan其中一部分 在过滤数据这里 == Abstract Syntax Tree == +- LogicalFilter(condition=[error_exist($1)]) == Optimized Logical Plan == +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
== Abstract Syntax Tree == LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], _c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], _c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], clusterName=[$2]) +-

Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 Dian Fu
可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables > 在 2020年10月15日,下午7:02,whh_960101 写道: > > hi, > 我刚才改了一下你的例子[1],通过from_elements构建一个source表 > 然后使用我的udf >

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
hi, 我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source =

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
hi, 我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source =

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) def udf1(msg): #udf1就是简单的筛选log中的error关键字 if msg is None: return '' msg_dic = json.loads(msg.strip()) log = msg_dic.get('log').lower() if 'error' in log or 'fail' in log:

Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 Xingbo Huang
Hi, 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 [1] https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 Best, Xingbo whh_960101 于2020年10月15日周四 下午2:30写道: > 您好,我使用pyflink时的代码如下,有如下问题: > > > source = st_env.from_path('source') >

pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table =

pyflink sql中select,where都带udf,其中一个udf失效

2020-10-14 文章 whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table =