我摘取了plan其中一部分
在过滤数据这里
== Abstract Syntax Tree ==
+- LogicalFilter(condition=[error_exist($1)])
== Optimized Logical Plan ==
+- PythonCalc(select=[message, kubernetes, clusterName,
error_exist(message) AS f0])
== Abstract Syntax Tree ==
LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''],
_c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'],
_c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name],
clusterName=[$2])
+-
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据
但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
source =
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据
但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
source =
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
def udf1(msg): #udf1就是简单的筛选log中的error关键字
if msg is None:
return ''
msg_dic = json.loads(msg.strip())
log = msg_dic.get('log').lower()
if 'error' in log or 'fail' in log: