Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
我摘取了plan其中一部分 在过滤数据这里 == Abstract Syntax Tree == +- LogicalFilter(condition=[error_exist($1)]) == Optimized Logical Plan == +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
== Abstract Syntax Tree == LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], _c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], _c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], clusterName=[$2]) +-

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
hi, 我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source =

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
hi, 我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source =

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 文章 whh_960101
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) def udf1(msg): #udf1就是简单的筛选log中的error关键字 if msg is None: return '' msg_dic = json.loads(msg.strip()) log = msg_dic.get('log').lower() if 'error' in log or 'fail' in log: